You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/07/14 12:07:34 UTC
svn commit: r963998 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
Author: ningjiang
Date: Wed Jul 14 10:07:34 2010
New Revision: 963998
URL: http://svn.apache.org/viewvc?rev=963998&view=rev
Log:
CAMEL-2947 CachedOutputStream supports not to close itself when the exchange is completed
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=963998&r1=963997&r2=963998&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java Wed Jul 14 10:07:34 2010
@@ -56,8 +56,12 @@ public class CachedOutputStream extends
private long threshold = 64 * 1024;
private File outputDir;
-
+
public CachedOutputStream(Exchange exchange) {
+ this(exchange, true);
+ }
+
+ public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) {
String hold = exchange.getContext().getProperties().get(THRESHOLD);
String dir = exchange.getContext().getProperties().get(TEMP_DIR);
if (hold != null) {
@@ -67,25 +71,27 @@ public class CachedOutputStream extends
this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
}
- // add on completion so we can cleanup after the exchange is done such as deleting temporary files
- exchange.addOnCompletion(new SynchronizationAdapter() {
- @Override
- public void onDone(Exchange exchange) {
- try {
- if (fileInputStreamCache != null) {
- fileInputStreamCache.close();
+ if (closedOnCompletion) {
+ // add on completion so we can cleanup after the exchange is done such as deleting temporary files
+ exchange.addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ try {
+ if (fileInputStreamCache != null) {
+ fileInputStreamCache.close();
+ }
+ close();
+ } catch (Exception e) {
+ LOG.warn("Error deleting temporary cache file: " + tempFile, e);
}
- close();
- } catch (Exception e) {
- LOG.warn("Error deleting temporary cache file: " + tempFile, e);
}
- }
-
- @Override
- public String toString() {
- return "OnCompletion[CachedOutputStream]";
- }
- });
+
+ @Override
+ public String toString() {
+ return "OnCompletion[CachedOutputStream]";
+ }
+ });
+ }
}
public void flush() throws IOException {
@@ -152,6 +158,11 @@ public class CachedOutputStream extends
throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
}
}
+ }
+
+ public InputStream getWrappedInputStream() throws IOException {
+ // The WrappedInputStream will close the CachedOuputStream when it is closed
+ return new WrappedInputStream(this, getInputStream());
}
@@ -206,5 +217,37 @@ public class CachedOutputStream extends
inMemory = false;
}
}
+
+ // This class will close the CachedOutputStream when it is closed
+ private class WrappedInputStream extends InputStream {
+ private CachedOutputStream cachedOutputStream;
+ private InputStream inputStream;
+
+ WrappedInputStream(CachedOutputStream cos, InputStream is) {
+ cachedOutputStream = cos;
+ inputStream = is;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return inputStream.read();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return inputStream.available();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ inputStream.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ cachedOutputStream.close();
+ }
+ }
}
Modified: camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java?rev=963998&r1=963997&r2=963998&view=diff
==============================================================================
--- camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java (original)
+++ camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java Wed Jul 14 10:07:34 2010
@@ -228,9 +228,11 @@ public class HttpProducer extends Defaul
// As httpclient is using a AutoCloseInputStream, it will be closed when the connection is closed
// we need to cache the stream for it.
try {
- CachedOutputStream cos = new CachedOutputStream(exchange);
+ // This CachedOutputStream will not be closed when the exchange is onCompletion
+ CachedOutputStream cos = new CachedOutputStream(exchange, false);
IOHelper.copy(is, cos);
- return cos.getInputStream();
+ // When the InputStream is closed, the CachedOutputStream will be closed
+ return cos.getWrappedInputStream();
} finally {
IOHelper.close(is, "Extracting response body", LOG);
}