You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/07/26 13:29:54 UTC
svn commit: r797907 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/converter/stream/
camel-core/src/main/java/org/apache/camel/util/
camel-core/src/test/java/org/apache/camel/converter/stream/
camel-core/src/test/java/org/apache/camel/iss...
Author: davsclaus
Date: Sun Jul 26 11:29:54 2009
New Revision: 797907
URL: http://svn.apache.org/viewvc?rev=797907&view=rev
Log:
CAMEL-1849: Cleanup in CachedOutputStream. Http component uses a defensive copy of response body stream to avoid returning a live stream that has been closed already. Also ensured that file based stream cache always deletes temp files as its done by on completion strategy now.
Added:
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java Sun Jul 26 11:29:54 2009
@@ -20,20 +20,19 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
-import org.apache.camel.converter.IOConverter;
+import org.apache.camel.impl.SynchronizationAdapter;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.IOHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* This output stream will store the content into a File if the stream context size is exceed the
@@ -44,378 +43,155 @@
* output stream will be deleted when you close this output stream or the cached inputStream.
*/
public class CachedOutputStream extends OutputStream {
+ private static final transient Log LOG = LogFactory.getLog(CachedOutputStream.class);
+
public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
- protected boolean outputLocked;
- protected OutputStream currentStream;
-
- private final List<Object> streamList = new ArrayList<Object>();
- private long threshold = 64 * 1024;
+ private OutputStream currentStream = new ByteArrayOutputStream(2048);
+ private boolean inMemory = true;
private int totalLength;
- private boolean inMemory;
private File tempFile;
+
+ private long threshold = 64 * 1024;
private File outputDir;
- public CachedOutputStream() {
- currentStream = new ByteArrayOutputStream(2048);
- inMemory = true;
- }
-
- public CachedOutputStream(long threshold) {
- this();
- this.threshold = threshold;
- }
-
- public CachedOutputStream(Map<String, String> properties) {
- this();
- String value = properties.get(THRESHOLD);
- if (value != null) {
- int i = Integer.parseInt(value);
- if (i > 0) {
- threshold = i;
- }
- }
- value = properties.get(TEMP_DIR);
- if (value != null) {
- File f = new File(value);
- if (f.exists() && f.isDirectory()) {
- outputDir = f;
- } else {
- outputDir = null;
+ public CachedOutputStream(Exchange exchange) {
+ String hold = exchange.getContext().getProperties().get(THRESHOLD);
+ String dir = exchange.getContext().getProperties().get(TEMP_DIR);
+ if (hold != null) {
+ this.threshold = exchange.getContext().getTypeConverter().convertTo(Long.class, hold);
+ }
+ if (dir != null) {
+ this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
+ }
+
+ // add on completion so we can cleanup after the exchange is done such as deleting temporary files
+ exchange.addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ try {
+ // cleanup temporary file
+ if (tempFile != null) {
+ boolean deleted = tempFile.delete();
+ if (!deleted) {
+ LOG.warn("Cannot delete temporary cache file: " + tempFile);
+ } else if (LOG.isTraceEnabled()) {
+ LOG.trace("Deleted temporary cache file: " + tempFile);
+ }
+ tempFile = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error deleting temporary cache file: " + tempFile, e);
+ }
}
- } else {
- outputDir = null;
- }
- }
- /**
- * Perform any actions required on stream flush (freeze headers, reset
- * output stream ... etc.)
- */
- protected void doFlush() throws IOException {
+ @Override
+ public String toString() {
+ return "OnCompletion[CachedOutputStream]";
+ }
+ });
}
public void flush() throws IOException {
currentStream.flush();
- doFlush();
}
- /**
- * Perform any actions required on stream closure (handle response etc.)
- */
- protected void doClose() throws IOException {
- }
-
- /**
- * Perform any actions required after stream closure (close the other related stream etc.)
- */
- protected void postClose() throws IOException {
- }
-
- /**
- * Locks the output stream to prevent additional writes, but maintains
- * a pointer to it so an InputStream can be obtained
- * @throws IOException
- */
- public void lockOutputStream() throws IOException {
- currentStream.flush();
- outputLocked = true;
- streamList.remove(currentStream);
- }
-
public void close() throws IOException {
- currentStream.flush();
- doClose();
currentStream.close();
- maybeDeleteTempFile(currentStream);
- postClose();
}
public boolean equals(Object obj) {
return currentStream.equals(obj);
}
- /**
- * Replace the original stream with the new one, optionally copying the content of the old one
- * into the new one.
- * When with Attachment, needs to replace the xml writer stream with the stream used by
- * AttachmentSerializer or copy the cached output stream to the "real"
- * output stream, i.e. onto the wire.
- *
- * @param out the new output stream
- * @param copyOldContent flag indicating if the old content should be copied
- * @throws IOException
- */
- public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
- if (out == null) {
- out = new ByteArrayOutputStream();
- }
-
- if (currentStream instanceof CachedOutputStream) {
- CachedOutputStream ac = (CachedOutputStream) currentStream;
- InputStream in = ac.getInputStream();
- IOHelper.copyAndCloseInput(in, out);
- } else {
- if (inMemory) {
- if (currentStream instanceof ByteArrayOutputStream) {
- ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
- if (copyOldContent && byteOut.size() > 0) {
- byteOut.writeTo(out);
- }
- } else {
- throw new IOException("Unknown format of currentStream: " + currentStream);
- }
- } else {
- // read the file
- currentStream.close();
- FileInputStream fin = new FileInputStream(tempFile);
- if (copyOldContent) {
- IOHelper.copyAndCloseInput(fin, out);
- }
- streamList.remove(currentStream);
- tempFile.delete();
- tempFile = null;
- inMemory = true;
- }
- }
- currentStream = out;
- outputLocked = false;
- }
-
- public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException {
- IOHelper.copyAndCloseInput(in, out, bufferSize);
- }
-
- public int size() {
- return totalLength;
- }
-
- public byte[] getBytes() throws IOException {
- flush();
- if (inMemory) {
- if (currentStream instanceof ByteArrayOutputStream) {
- return ((ByteArrayOutputStream)currentStream).toByteArray();
- } else {
- throw new IOException("Unknown format of currentStream");
- }
- } else {
- // read the file
- FileInputStream fin = new FileInputStream(tempFile);
- return IOConverter.toBytes(fin);
- }
- }
-
- public void writeCacheTo(OutputStream out) throws IOException {
- flush();
- if (inMemory) {
- if (currentStream instanceof ByteArrayOutputStream) {
- ((ByteArrayOutputStream)currentStream).writeTo(out);
- } else {
- throw new IOException("Unknown format of currentStream");
- }
- } else {
- // read the file
- FileInputStream fin = new FileInputStream(tempFile);
- IOHelper.copyAndCloseInput(fin, out);
- }
- }
-
-
- public void writeCacheTo(StringBuilder out, int limit) throws IOException {
- flush();
- if (totalLength < limit
- || limit == -1) {
- writeCacheTo(out);
- return;
- }
-
- int count = 0;
- if (inMemory) {
- if (currentStream instanceof ByteArrayOutputStream) {
- byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
- out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
- } else {
- throw new IOException("Unknown format of currentStream: " + currentStream);
- }
- } else {
- // read the file
- FileInputStream fin = new FileInputStream(tempFile);
- byte bytes[] = new byte[1024];
- int x = fin.read(bytes);
- while (x != -1) {
- if ((count + x) > limit) {
- x = limit - count;
- }
- out.append(IOHelper.newStringFromBytes(bytes, 0, x));
- count += x;
-
- if (count >= limit) {
- x = -1;
- } else {
- x = fin.read(bytes);
- }
- }
- fin.close();
- }
- }
- public void writeCacheTo(StringBuilder out) throws IOException {
- flush();
- if (inMemory) {
- if (currentStream instanceof ByteArrayOutputStream) {
- byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
- out.append(IOHelper.newStringFromBytes(bytes));
- } else {
- throw new IOException("Unknown format of currentStream: " + currentStream);
- }
- } else {
- // read the file
- FileInputStream fin = new FileInputStream(tempFile);
- byte bytes[] = new byte[1024];
- int x = fin.read(bytes);
- while (x != -1) {
- out.append(IOHelper.newStringFromBytes(bytes, 0, x));
- x = fin.read(bytes);
- }
- fin.close();
- }
- }
-
-
- /**
- * @return the underlying output stream
- */
- public OutputStream getOut() {
- return currentStream;
- }
-
public int hashCode() {
return currentStream.hashCode();
}
public String toString() {
- StringBuilder builder = new StringBuilder().append("[")
- .append(CachedOutputStream.class.getName())
- .append(" Content: ");
- try {
- writeCacheTo(builder);
- } catch (IOException e) {
- //ignore
- }
- return builder.append("]").toString();
- }
-
- protected void onWrite() throws IOException {
+ return "CachedOutputStream[size: " + totalLength + "]";
}
public void write(byte[] b, int off, int len) throws IOException {
- if (!outputLocked) {
- onWrite();
- this.totalLength += len;
- if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
- createFileOutputStream();
- }
- currentStream.write(b, off, len);
+ this.totalLength += len;
+ if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ pageToFileStream();
}
+ currentStream.write(b, off, len);
+ flush();
}
public void write(byte[] b) throws IOException {
- if (!outputLocked) {
- onWrite();
- this.totalLength += b.length;
- if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
- createFileOutputStream();
- }
- currentStream.write(b);
+ this.totalLength += b.length;
+ if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ pageToFileStream();
}
+ currentStream.write(b);
+ flush();
}
public void write(int b) throws IOException {
- if (!outputLocked) {
- onWrite();
- this.totalLength++;
- if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
- createFileOutputStream();
- }
- currentStream.write(b);
- }
- }
-
- private void createFileOutputStream() throws IOException {
- ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
- if (outputDir == null) {
- tempFile = FileUtil.createTempFile("cos", "tmp");
- } else {
- tempFile = FileUtil.createTempFile("cos", "tmp", outputDir, false);
+ this.totalLength++;
+ if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ pageToFileStream();
}
-
- currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
- bout.writeTo(currentStream);
- inMemory = false;
- streamList.add(currentStream);
- }
-
- public File getTempFile() {
- return tempFile != null && tempFile.exists() ? tempFile : null;
+ currentStream.write(b);
+ flush();
}
public InputStream getInputStream() throws IOException {
- flush();
if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
} else {
- return null;
+ throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
}
} else {
try {
- FileInputStream fileInputStream = new FileInputStream(tempFile) {
- public void close() throws IOException {
- super.close();
- maybeDeleteTempFile(this);
- }
- };
- streamList.add(fileInputStream);
- return fileInputStream;
+ return new FileInputStreamCache(tempFile, this);
} catch (FileNotFoundException e) {
- throw IOHelper.createIOException("Cached file was already deleted", e);
+ throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
}
}
}
-
+
+
public StreamCache getStreamCache() throws IOException {
- flush();
if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
} else {
- return null;
+ throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
}
} else {
try {
return new FileInputStreamCache(tempFile, this);
} catch (FileNotFoundException e) {
- throw IOHelper.createIOException("Cached file was already deleted", e);
+ throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
}
}
}
-
- private void maybeDeleteTempFile(Object stream) {
- streamList.remove(stream);
- if (!inMemory && tempFile != null && streamList.isEmpty()) {
- tempFile.delete();
- tempFile = null;
- currentStream = new ByteArrayOutputStream(1024);
- inMemory = true;
+
+ private void pageToFileStream() throws IOException {
+ ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
+ if (outputDir == null) {
+ tempFile = FileUtil.createTempFile("cos", ".tmp");
+ } else {
+ tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir);
}
- }
- public void setOutputDir(File outputDir) throws IOException {
- this.outputDir = outputDir;
- }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Creating temporary stream cache file: " + tempFile);
+ }
- public void setThreshold(long threshold) {
- this.threshold = threshold;
+ try {
+ currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
+ bout.writeTo(currentStream);
+ } finally {
+ // ensure flag is flipped to file based
+ inMemory = false;
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java Sun Jul 26 11:29:54 2009
@@ -32,9 +32,6 @@
private CachedOutputStream cachedOutputStream;
private File file;
- public FileInputStreamCache() {
- }
-
public FileInputStreamCache(File file, CachedOutputStream cos) throws FileNotFoundException {
this.file = file;
this.cachedOutputStream = cos;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java Sun Jul 26 11:29:54 2009
@@ -64,8 +64,7 @@
@Converter
public StreamCache convertToStreamCache(InputStream stream, Exchange exchange) throws IOException {
- // set up CachedOutputStream with the properties
- CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
+ CachedOutputStream cos = new CachedOutputStream(exchange);
IOHelper.copyAndCloseInput(stream, cos);
return cos.getStreamCache();
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java Sun Jul 26 11:29:54 2009
@@ -39,7 +39,7 @@
public StreamSourceCache(StreamSource source, Exchange exchange) throws IOException {
if (source.getInputStream() != null) {
// set up CachedOutputStream with the properties
- CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
+ CachedOutputStream cos = new CachedOutputStream(exchange);
IOHelper.copyAndCloseInput(source.getInputStream(), cos);
streamCache = cos.getStreamCache();
readCache = null;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java Sun Jul 26 11:29:54 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Locale;
+import java.util.Random;
import java.util.Stack;
/**
@@ -44,53 +45,9 @@
return path;
}
- private static synchronized File getDefaultTempDir() {
- if (defaultTempDir != null
- && defaultTempDir.exists()) {
- return defaultTempDir;
- }
-
- String s = null;
- try {
- s = System.getProperty(FileUtil.class.getName() + ".TempDirectory");
- } catch (SecurityException e) {
- //Ignorable, we'll use the default
- }
- if (s == null) {
- int x = (int)(Math.random() * 1000000);
- s = System.getProperty("java.io.tmpdir");
- File checkExists = new File(s);
- if (!checkExists.exists()) {
- throw new RuntimeException("The directory "
- + checkExists.getAbsolutePath()
- + " does not exist, please set java.io.tempdir"
- + " to an existing directory");
- }
- File f = new File(s, "camel-tmp-" + x);
- while (!f.mkdir()) {
- x = (int)(Math.random() * 1000000);
- f = new File(s, "camel-tmp-" + x);
- }
- defaultTempDir = f;
- Thread hook = new Thread() {
- @Override
- public void run() {
- removeDir(defaultTempDir);
- }
- };
- Runtime.getRuntime().addShutdownHook(hook);
- } else {
- //assume someone outside of us will manage the directory
- File f = new File(s);
- f.mkdirs();
- defaultTempDir = f;
- }
- return defaultTempDir;
- }
-
public static void mkDir(File dir) {
if (dir == null) {
- throw new RuntimeException("dir attribute is required");
+ throw new IllegalArgumentException("dir attribute is required");
}
if (dir.isFile()) {
@@ -130,8 +87,7 @@
if (list == null) {
list = new String[0];
}
- for (int i = 0; i < list.length; i++) {
- String s = list[i];
+ for (String s : list) {
File f = new File(d, s);
if (f.isDirectory()) {
removeDir(f);
@@ -164,15 +120,11 @@
}
public static File createTempFile(String prefix, String suffix) throws IOException {
- return createTempFile(prefix, suffix, null, false);
+ return createTempFile(prefix, suffix, null);
}
-
- public static File createTempFile(String prefix, String suffix, File parentDir,
- boolean deleteOnExit) throws IOException {
- File result = null;
- File parent = (parentDir == null)
- ? getDefaultTempDir()
- : parentDir;
+
+ public static File createTempFile(String prefix, String suffix, File parentDir) throws IOException {
+ File parent = (parentDir == null) ? getDefaultTempDir() : parentDir;
if (suffix == null) {
suffix = ".tmp";
@@ -182,15 +134,11 @@
} else if (prefix.length() < 3) {
prefix = prefix + "camel";
}
- result = File.createTempFile(prefix, suffix, parent);
- //if parentDir is null, we're in our default dir
- //which will get completely wiped on exit from our exit
- //hook. No need to set deleteOnExit() which leaks memory.
- if (deleteOnExit && parentDir != null) {
- result.deleteOnExit();
- }
- return result;
+ // create parent folder
+ parent.mkdirs();
+
+ return File.createTempFile(prefix, suffix, parent);
}
/**
@@ -286,4 +234,42 @@
return sb.toString();
}
+ private static synchronized File getDefaultTempDir() {
+ if (defaultTempDir != null && defaultTempDir.exists()) {
+ return defaultTempDir;
+ }
+
+ String s = System.getProperty("java.io.tmpdir");
+ File checkExists = new File(s);
+ if (!checkExists.exists()) {
+ throw new RuntimeException("The directory "
+ + checkExists.getAbsolutePath()
+ + " does not exist, please set java.io.tempdir"
+ + " to an existing directory");
+ }
+
+ // why do we create another tmp folder
+ Random ran = new Random();
+ int x = ran.nextInt(1000000);
+
+ File f = new File(s, "camel-tmp-" + x);
+ while (!f.mkdir()) {
+ x = ran.nextInt(1000000);
+ f = new File(s, "camel-tmp-" + x);
+ }
+
+ defaultTempDir = f;
+
+ // create shutdown hook to remove the temp dir
+ Thread hook = new Thread() {
+ @Override
+ public void run() {
+ removeDir(defaultTempDir);
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(hook);
+
+ return defaultTempDir;
+ }
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java Sun Jul 26 11:29:54 2009
@@ -22,28 +22,35 @@
import java.io.InputStream;
import java.io.InputStreamReader;
-import junit.framework.TestCase;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
import org.apache.camel.converter.IOConverter;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultUnitOfWork;
+import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.util.CollectionStringBuffer;
-public class CachedOutputStreamTest extends TestCase {
+public class CachedOutputStreamTest extends ContextTestSupport {
private static final String TEST_STRING = "This is a test string and it has enough"
+ " aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa ";
-
- private File file = new File("./target/cacheFile");
- private static void deleteDirectory(File file) {
- if (file.isDirectory()) {
- File[] files = file.listFiles();
- for (int i = 0; i < files.length; i++) {
- deleteDirectory(files[i]);
- }
- }
- file.delete();
+ private Exchange exchange;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ context.getProperties().put(CachedOutputStream.TEMP_DIR, "./target/cachedir");
+ context.getProperties().put(CachedOutputStream.THRESHOLD, "16");
+ deleteDirectory("./target/cachedir");
+ createDirectory("./target/cachedir");
+
+ exchange = new DefaultExchange(context);
+ UnitOfWork uow = new DefaultUnitOfWork(exchange);
+ exchange.setUnitOfWork(uow);
}
-
- private static String toString(InputStream input) throws IOException {
+
+ private static String toString(InputStream input) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
CollectionStringBuffer builder = new CollectionStringBuffer("\n");
while (true) {
@@ -54,18 +61,12 @@
builder.append(line);
}
}
-
- protected void setUp() throws Exception {
- if (file.exists()) {
- deleteDirectory(file);
- }
- file.mkdirs();
- }
-
+
public void testCacheStreamToFileAndCloseStream() throws IOException {
- CachedOutputStream cos = new CachedOutputStream(16);
- cos.setOutputDir(file);
- cos.write(TEST_STRING.getBytes("UTF-8"));
+ CachedOutputStream cos = new CachedOutputStream(exchange);
+ cos.write(TEST_STRING.getBytes("UTF-8"));
+
+ File file = new File("./target/cachedir");
String[] files = file.list();
assertEquals("we should have a temp file", files.length, 1);
assertTrue("The file name should start with cos" , files[0].startsWith("cos"));
@@ -73,8 +74,12 @@
StreamCache cache = cos.getStreamCache();
assertTrue("Should get the FileInputStreamCache", cache instanceof FileInputStreamCache);
String temp = toString((InputStream)cache);
+
((InputStream)cache).close();
assertEquals("Cached a wrong file", temp, TEST_STRING);
+
+ exchange.getUnitOfWork().done(exchange);
+
try {
cache.reset();
// The stream is closed, so the temp file is gone.
@@ -82,14 +87,17 @@
} catch (Exception exception) {
// do nothing
}
+
+
files = file.list();
assertEquals("we should have no temp file", files.length, 0);
}
- public void testCacheStreamToFileAndNotCloseStream() throws IOException {
- CachedOutputStream cos = new CachedOutputStream(16);
- cos.setOutputDir(file);
- cos.write(TEST_STRING.getBytes("UTF-8"));
+ public void testCacheStreamToFileAndNotCloseStream() throws IOException {
+ CachedOutputStream cos = new CachedOutputStream(exchange);
+ cos.write(TEST_STRING.getBytes("UTF-8"));
+
+ File file = new File("./target/cachedir");
String[] files = file.list();
assertEquals("we should have a temp file", files.length, 1);
assertTrue("The file name should start with cos" , files[0].startsWith("cos"));
@@ -102,20 +110,28 @@
temp = toString((InputStream)cache);
assertEquals("Cached a wrong file", temp, TEST_STRING);
+ exchange.getUnitOfWork().done(exchange);
+
((InputStream)cache).close();
files = file.list();
assertEquals("we should have no temp file", files.length, 0);
}
public void testCacheStreamToMemory() throws IOException {
- CachedOutputStream cos = new CachedOutputStream();
- cos.setOutputDir(file);
- cos.write(TEST_STRING.getBytes("UTF-8"));
+ context.getProperties().put(CachedOutputStream.THRESHOLD, "1024");
+
+ CachedOutputStream cos = new CachedOutputStream(exchange);
+ cos.write(TEST_STRING.getBytes("UTF-8"));
+
+ File file = new File("./target/cachedir");
String[] files = file.list();
+
assertEquals("we should have no temp file", files.length, 0);
StreamCache cache = cos.getStreamCache();
assertTrue("Should get the InputStreamCache", cache instanceof InputStreamCache);
String temp = IOConverter.toString((InputStream)cache);
assertEquals("Cached a wrong file", temp, TEST_STRING);
+
+ exchange.getUnitOfWork().done(exchange);
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java Sun Jul 26 11:29:54 2009
@@ -76,6 +76,7 @@
// since the stream is closed you delete the temp file
// reset will not work any more
cache.reset();
+ exchange.getUnitOfWork().done(exchange);
fail("except the exception here");
} catch (Exception exception) {
// do nothing
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Sun Jul 26 11:29:54 2009
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.issues;
import java.io.ByteArrayInputStream;
Modified: camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java (original)
+++ camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java Sun Jul 26 11:29:54 2009
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
+import java.io.ByteArrayInputStream;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -119,20 +120,27 @@
HttpOperationFailedException exception;
Header[] headers = method.getResponseHeaders();
InputStream is = extractResponseBody(method, exchange);
+ // make a defensive copy of the response body in the exception so its detached from the cache
+ InputStream copy = null;
+ if (is != null) {
+ copy = new ByteArrayInputStream(exchange.getContext().getTypeConverter().convertTo(byte[].class, is));
+ }
+
if (responseCode >= 300 && responseCode < 400) {
String redirectLocation;
Header locationHeader = method.getResponseHeader("location");
if (locationHeader != null) {
redirectLocation = locationHeader.getValue();
- exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), redirectLocation, headers, is);
+ exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), redirectLocation, headers, copy);
} else {
// no redirect location
- exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), headers, is);
+ exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), headers, copy);
}
} else {
// internal server error (error code 500)
- exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), headers, is);
+ exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), headers, copy);
}
+
return exception;
}
@@ -169,7 +177,7 @@
private static InputStream doExtractResponseBody(InputStream is, Exchange exchange) throws IOException {
try {
- CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
+ CachedOutputStream cos = new CachedOutputStream(exchange);
IOHelper.copy(is, cos);
return cos.getInputStream();
} finally {
Added: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java?rev=797907&view=auto
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java (added)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java Sun Jul 26 11:29:54 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import java.io.File;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.http.HttpOperationFailedException;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class HttpStreamCacheFileTest extends CamelTestSupport {
+
+ private String body = "12345678901234567890123456789012345678901234567890";
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ deleteDirectory("./target/cachedir");
+ createDirectory("./target/cachedir");
+ super.setUp();
+ }
+
+ @Test
+ public void testStreamCacheToFileShouldBeDeletedInCaseOfResponse() throws Exception {
+ String out = template.requestBody("direct:start", "Hello World", String.class);
+ assertEquals("Bye World", out);
+
+ // the temporary files should have been deleted
+ File file = new File("./target/cachedir");
+ String[] files = file.list();
+ assertEquals("There should be no files", files.length, 0);
+ }
+
+ @Test
+ public void testStreamCacheToFileShouldBeDeletedInCaseOfException() throws Exception {
+ try {
+ template.requestBody("direct:start", null, String.class);
+ fail("Should have thrown an exception");
+ } catch (CamelExecutionException e) {
+ HttpOperationFailedException hofe = assertIsInstanceOf(HttpOperationFailedException.class, e.getCause());
+ String s = context.getTypeConverter().convertTo(String.class, hofe.getResponseBody());
+ assertEquals("Response body", body, s);
+ }
+
+ // the temporary files should have been deleted
+ File file = new File("./target/cachedir");
+ String[] files = file.list();
+ assertEquals("There should be no files", files.length, 0);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // enable stream caching and use a low threshold so its forced to write to file
+ context.getProperties().put(CachedOutputStream.TEMP_DIR, "./target/cachedir");
+ context.getProperties().put(CachedOutputStream.THRESHOLD, "16");
+ context.setStreamCaching(true);
+
+ // use a route so we got an unit of work
+ from("direct:start").to("http://localhost:8123/myserver");
+
+ from("jetty://http://localhost:8123/myserver")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ if (exchange.getIn().getBody() == null) {
+ exchange.getOut().setBody("12345678901234567890123456789012345678901234567890");
+ exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 500);
+ } else {
+ exchange.getOut().setBody("Bye World");
+ }
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date