You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/17 17:11:39 UTC
git commit: CAMEL-6476: Introducing StreamCachingStrategy SPI to make
it easier to configure and allow 3rd party to plugin custom strategies. Work
in progress.
Updated Branches:
refs/heads/master 6f0d3368f -> d0235489f
CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0235489
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0235489
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0235489
Branch: refs/heads/master
Commit: d0235489fd77031c70501d6e859807ae72117034
Parents: 6f0d336
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 17 16:05:42 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 17 17:11:28 2013 +0200
----------------------------------------------------------------------
.../converter/stream/CachedOutputStream.java | 49 ++++-----
.../apache/camel/impl/DefaultCamelContext.java | 6 --
.../impl/DefaultStreamCachingStrategy.java | 100 ++++++++++++++++++-
.../apache/camel/spi/StreamCachingStrategy.java | 46 +++++++++
.../java/org/apache/camel/util/IOHelper.java | 5 +-
.../stream/CachedOutputStreamTest.java | 56 ++++++++---
.../stream/StreamCacheConverterTest.java | 26 +++--
.../processor/SplitterStreamCacheTest.java | 4 +-
.../xml/AbstractCamelContextFactoryBean.java | 6 ++
9 files changed, 231 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index f12a111..1b7f90b 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -31,6 +31,7 @@ import javax.crypto.CipherOutputStream;
import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
+import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.ObjectHelper;
@@ -50,22 +51,22 @@ import org.slf4j.LoggerFactory;
* fileInputStream is closed after the exchange is completed.
*/
public class CachedOutputStream extends OutputStream {
+ @Deprecated
public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
+ @Deprecated
public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize";
+ @Deprecated
public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
+ @Deprecated
public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
private static final transient Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class);
+ private final StreamCachingStrategy strategy;
private OutputStream currentStream;
private boolean inMemory = true;
private int totalLength;
private File tempFile;
private FileInputStreamCache fileInputStreamCache;
-
- private long threshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
- private int bufferSize = 2 * 1024;
- private File outputDir;
- private String cipherTransformation;
private CipherPair ciphers;
public CachedOutputStream(Exchange exchange) {
@@ -73,25 +74,8 @@ public class CachedOutputStream extends OutputStream {
}
public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) {
- // TODO: these options should be on StreamCachingStrategy
- String bufferSize = exchange.getContext().getProperty(BUFFER_SIZE);
- String hold = exchange.getContext().getProperty(THRESHOLD);
- String dir = exchange.getContext().getProperty(TEMP_DIR);
-
- if (bufferSize != null) {
- this.bufferSize = exchange.getContext().getTypeConverter().convertTo(Integer.class, bufferSize);
- }
- if (hold != null) {
- this.threshold = exchange.getContext().getTypeConverter().convertTo(Long.class, hold);
- }
- if (dir != null) {
- this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
- } else {
- this.outputDir = exchange.getContext().getStreamCachingStrategy().getTemporaryDirectory();
- }
- this.cipherTransformation = exchange.getContext().getProperty(CIPHER_TRANSFORMATION);
-
- currentStream = new ByteArrayOutputStream(this.bufferSize);
+ this.strategy = exchange.getContext().getStreamCachingStrategy();
+ currentStream = new ByteArrayOutputStream(strategy.getBufferSize());
if (closedOnCompletion) {
// add on completion so we can cleanup after the exchange is done such as deleting temporary files
@@ -143,7 +127,7 @@ public class CachedOutputStream extends OutputStream {
public void write(byte[] b, int off, int len) throws IOException {
this.totalLength += len;
- if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) {
pageToFileStream();
}
currentStream.write(b, off, len);
@@ -151,7 +135,7 @@ public class CachedOutputStream extends OutputStream {
public void write(byte[] b) throws IOException {
this.totalLength += b.length;
- if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) {
pageToFileStream();
}
currentStream.write(b);
@@ -159,7 +143,7 @@ public class CachedOutputStream extends OutputStream {
public void write(int b) throws IOException {
this.totalLength++;
- if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) {
pageToFileStream();
}
currentStream.write(b);
@@ -224,7 +208,7 @@ public class CachedOutputStream extends OutputStream {
flush();
ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
- tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir);
+ tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getTemporaryDirectory());
LOG.trace("Creating temporary stream cache file: {}", tempFile);
@@ -236,9 +220,10 @@ public class CachedOutputStream extends OutputStream {
inMemory = false;
}
}
-
+
+ @Deprecated
public int getBufferSize() {
- return bufferSize;
+ return strategy.getBufferSize();
}
// This class will close the CachedOutputStream when it is closed
@@ -275,10 +260,10 @@ public class CachedOutputStream extends OutputStream {
private OutputStream createOutputStream(File file) throws IOException {
OutputStream out = new BufferedOutputStream(new FileOutputStream(file));
- if (ObjectHelper.isNotEmpty(cipherTransformation)) {
+ if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
try {
if (ciphers == null) {
- ciphers = new CipherPair(cipherTransformation);
+ ciphers = new CipherPair(strategy.getSpoolChiper());
}
} catch (GeneralSecurityException e) {
throw new IOException(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index e4017be..b977979 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -1676,12 +1676,6 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
}
}
if (streamCachingInUse) {
- Long threshold = CamelContextHelper.convertTo(this, Long.class, getProperties().get("CamelCachedOutputStreamThreshold"));
- if (threshold == null) {
- threshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
- }
- log.info("Stream caching is enabled, and using {} kb as threshold for overflow and spooling to disk store.", threshold / 1024);
-
// stream caching is in use so enable the strategy
addService(streamCachingStrategy);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
index 3333d77..d5f558a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
@@ -18,19 +18,52 @@ package org.apache.camel.impl;
import java.io.File;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StreamCache;
import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements StreamCachingStrategy {
+/**
+ * Default implementation of {@link StreamCachingStrategy}
+ */
+public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements CamelContextAware, StreamCachingStrategy {
- // TODO: Add options to configure more stuff like overflow size et all
// TODO: Add JMX management
// TODO: Maybe use #syntax# for default temp dir so ppl can easily configure this
+ @Deprecated
+ public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
+ @Deprecated
+ public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize";
+ @Deprecated
+ public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
+ @Deprecated
+ public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
+
private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamCachingStrategy.class);
+
+ private CamelContext camelContext;
private File temporaryDirectory;
+ private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
+ private String spoolChiper;
+ private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE;
+ private boolean removeTemporaryDirectoryWhenStopping = true;
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ public void setTemporaryDirectory(String path) {
+ this.temporaryDirectory = new File(path);
+ }
public void setTemporaryDirectory(File path) {
this.temporaryDirectory = path;
@@ -40,8 +73,60 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
return temporaryDirectory;
}
+ public long getSpoolThreshold() {
+ return spoolThreshold;
+ }
+
+ public void setSpoolThreshold(long spoolThreshold) {
+ this.spoolThreshold = spoolThreshold;
+ }
+
+ public String getSpoolChiper() {
+ return spoolChiper;
+ }
+
+ public void setSpoolChiper(String spoolChiper) {
+ this.spoolChiper = spoolChiper;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public boolean isRemoveTemporaryDirectoryWhenStopping() {
+ return removeTemporaryDirectoryWhenStopping;
+ }
+
+ public void setRemoveTemporaryDirectoryWhenStopping(boolean removeTemporaryDirectoryWhenStopping) {
+ this.removeTemporaryDirectoryWhenStopping = removeTemporaryDirectoryWhenStopping;
+ }
+
@Override
protected void doStart() throws Exception {
+ String bufferSize = camelContext.getProperty(BUFFER_SIZE);
+ String hold = camelContext.getProperty(THRESHOLD);
+ String chiper = camelContext.getProperty(CIPHER_TRANSFORMATION);
+ String dir = camelContext.getProperty(TEMP_DIR);
+
+ if (bufferSize != null) {
+ this.bufferSize = camelContext.getTypeConverter().convertTo(Integer.class, bufferSize);
+ }
+ if (hold != null) {
+ this.spoolThreshold = camelContext.getTypeConverter().convertTo(Long.class, hold);
+ }
+ if (chiper != null) {
+ this.spoolChiper = chiper;
+ }
+ if (dir != null) {
+ this.temporaryDirectory = camelContext.getTypeConverter().convertTo(File.class, dir);
+ }
+
+ LOG.info("StreamCaching in use with {}", this.toString());
+
// create random temporary directory if none has been created
if (temporaryDirectory == null) {
temporaryDirectory = FileUtil.createNewTempDir();
@@ -60,9 +145,18 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
@Override
protected void doStop() throws Exception {
- if (temporaryDirectory != null) {
+ if (temporaryDirectory != null && isRemoveTemporaryDirectoryWhenStopping()) {
LOG.info("Removing temporary directory {}", temporaryDirectory);
FileUtil.removeDir(temporaryDirectory);
}
}
+
+ @Override
+ public String toString() {
+ return "DefaultStreamCachingStrategy["
+ + "temporaryDirectory=" + temporaryDirectory
+ + ", spoolThreshold=" + spoolThreshold
+ + ", spoolChiper=" + spoolChiper
+ + ", bufferSize=" + bufferSize + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
index cc5ad2e..a5cfa1b 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
@@ -23,7 +23,53 @@ import java.io.File;
*/
public interface StreamCachingStrategy {
+ /**
+ * Sets the temporary directory to use for overflow and spooling to disk.
+ * <p/>
+ * If no temporary directory has been explicit configured, then a directory
+ * is created in the <tt>java.io.tmpdir</tt> directory.
+ */
void setTemporaryDirectory(File path);
File getTemporaryDirectory();
+
+ void setTemporaryDirectory(String path);
+
+ /**
+ * Threshold in bytes when overflow to disk is activated.
+ * <p/>
+ * The default threshold is {@link org.apache.camel.StreamCache#DEFAULT_SPOOL_THRESHOLD} bytes (eg 128kb).
+ * Use <tt>-1</tt> to disable overflow to disk.
+ */
+ void setSpoolThreshold(long threshold);
+
+ long getSpoolThreshold();
+
+ /**
+ * Sets the buffer size to use when copying between buffers.
+ * <p/>
+ * The default size is {@link org.apache.camel.util.IOHelper#DEFAULT_BUFFER_SIZE}
+ */
+ void setBufferSize(int bufferSize);
+
+ int getBufferSize();
+
+ /**
+ * Sets a chiper name to use when spooling to disk to write with encryption.
+ * <p/>
+ * By default the data is not encrypted.
+ */
+ void setSpoolChiper(String chiper);
+
+ String getSpoolChiper();
+
+ /**
+ * Whether to remove the temporary directory when stopping.
+ * <p/>
+ * This option is default <tt>true</tt>
+ */
+ void setRemoveTemporaryDirectoryWhenStopping(boolean remove);
+
+ boolean isRemoveTemporaryDirectoryWhenStopping();
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
index c381e90..76d1e5f 100644
--- a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
@@ -44,9 +44,10 @@ import org.slf4j.LoggerFactory;
* @version
*/
public final class IOHelper {
-
+
+ public static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+
private static final transient Logger LOG = LoggerFactory.getLogger(IOHelper.class);
- private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
private IOHelper() {
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
index 46316f6..91729f1 100644
--- a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
@@ -38,11 +39,18 @@ public class CachedOutputStreamTest extends ContextTestSupport {
private Exchange exchange;
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.setStreamCaching(true);
+ context.getStreamCachingStrategy().setTemporaryDirectory("target/cachedir");
+ context.getStreamCachingStrategy().setSpoolThreshold(16);
+ return context;
+ }
+
protected void setUp() throws Exception {
super.setUp();
-
- context.getProperties().put(CachedOutputStream.TEMP_DIR, "target/cachedir");
- context.getProperties().put(CachedOutputStream.THRESHOLD, "16");
+
deleteDirectory("target/cachedir");
createDirectory("target/cachedir");
@@ -51,6 +59,11 @@ public class CachedOutputStreamTest extends ContextTestSupport {
exchange.setUnitOfWork(uow);
}
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
private static String toString(InputStream input) throws IOException {
BufferedReader reader = IOHelper.buffered(new InputStreamReader(input));
CollectionStringBuffer builder = new CollectionStringBuffer();
@@ -63,7 +76,9 @@ public class CachedOutputStreamTest extends ContextTestSupport {
}
}
- public void testCacheStreamToFileAndCloseStream() throws IOException {
+ public void testCacheStreamToFileAndCloseStream() throws Exception {
+ context.start();
+
CachedOutputStream cos = new CachedOutputStream(exchange);
cos.write(TEST_STRING.getBytes("UTF-8"));
@@ -96,9 +111,12 @@ public class CachedOutputStreamTest extends ContextTestSupport {
IOHelper.close(cos);
}
- public void testCacheStreamToFileAndCloseStreamEncrypted() throws IOException {
+ public void testCacheStreamToFileAndCloseStreamEncrypted() throws Exception {
// set some stream or 8-bit block cipher transformation name
- exchange.getContext().getProperties().put(CachedOutputStream.CIPHER_TRANSFORMATION, "RC4");
+ context.getStreamCachingStrategy().setSpoolChiper("RC4");
+
+ context.start();
+
CachedOutputStream cos = new CachedOutputStream(exchange);
cos.write(TEST_STRING.getBytes("UTF-8"));
cos.flush();
@@ -137,7 +155,9 @@ public class CachedOutputStreamTest extends ContextTestSupport {
IOHelper.close(cos);
}
- public void testCacheStreamToFileCloseStreamBeforeDone() throws IOException {
+ public void testCacheStreamToFileCloseStreamBeforeDone() throws Exception {
+ context.start();
+
CachedOutputStream cos = new CachedOutputStream(exchange);
cos.write(TEST_STRING.getBytes("UTF-8"));
@@ -163,8 +183,10 @@ public class CachedOutputStreamTest extends ContextTestSupport {
IOHelper.close(cos);
}
- public void testCacheStreamToMemory() throws IOException {
- context.getProperties().put(CachedOutputStream.THRESHOLD, "1024");
+ public void testCacheStreamToMemory() throws Exception {
+ context.getStreamCachingStrategy().setSpoolThreshold(1024);
+
+ context.start();
CachedOutputStream cos = new CachedOutputStream(exchange);
cos.write(TEST_STRING.getBytes("UTF-8"));
@@ -181,9 +203,11 @@ public class CachedOutputStreamTest extends ContextTestSupport {
IOHelper.close(cos);
}
- public void testCacheStreamToMemoryAsDiskIsdisabled() throws IOException {
+ public void testCacheStreamToMemoryAsDiskIsDisabled() throws Exception {
// -1 disables disk based cache
- context.getProperties().put(CachedOutputStream.THRESHOLD, "-1");
+ context.getStreamCachingStrategy().setSpoolThreshold(-1);
+
+ context.start();
CachedOutputStream cos = new CachedOutputStream(exchange);
cos.write(TEST_STRING.getBytes("UTF-8"));
@@ -202,14 +226,16 @@ public class CachedOutputStreamTest extends ContextTestSupport {
IOHelper.close(cos);
}
- public void testCachedOutputStreamCustomBufferSize() throws IOException {
+ public void testCachedOutputStreamCustomBufferSize() throws Exception {
// double the default buffer size
- context.getProperties().put(CachedOutputStream.BUFFER_SIZE, "4096");
-
+ context.getStreamCachingStrategy().setBufferSize(8192);
+
+ context.start();
+
CachedOutputStream cos = new CachedOutputStream(exchange);
cos.write(TEST_STRING.getBytes("UTF-8"));
- assertEquals("we should have a custom buffer size", cos.getBufferSize(), 4096);
+ assertEquals("we should have a custom buffer size", cos.getBufferSize(), 8192);
// make sure things still work after custom buffer size set
File file = new File("target/cachedir");
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
index aeb710f..b792d09 100644
--- a/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
@@ -19,9 +19,6 @@ package org.apache.camel.converter.stream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
import javax.xml.transform.Source;
import javax.xml.transform.sax.SAXSource;
import javax.xml.transform.stream.StreamSource;
@@ -51,6 +48,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
}
public void testConvertToStreamCache() throws Exception {
+ context.start();
+
ByteArrayInputStream inputStream = new ByteArrayInputStream(MESSAGE.getBytes());
StreamCache streamCache = StreamCacheConverter.convertToStreamCache(new SAXSource(new InputSource(inputStream)), exchange);
String message = exchange.getContext().getTypeConverter().convertTo(String.class, streamCache);
@@ -59,6 +58,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
}
public void testConvertToStreamCacheStreamSource() throws Exception {
+ context.start();
+
StreamSource source = new StreamSource(getTestFileStream());
StreamCache cache = StreamCacheConverter.convertToStreamCache(source, exchange);
//assert re-readability of the cached StreamSource
@@ -69,6 +70,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
}
public void testConvertToStreamCacheInputStream() throws Exception {
+ context.start();
+
InputStream is = getTestFileStream();
InputStream cache = (InputStream)StreamCacheConverter.convertToStreamCache(is, exchange);
//assert re-readability of the cached InputStream
@@ -77,10 +80,10 @@ public class StreamCacheConverterTest extends ContextTestSupport {
}
public void testConvertToStreamCacheInputStreamWithFileCache() throws Exception {
- // set up the properties
- Map<String, String> properties = new HashMap<String, String>();
- properties.put(CachedOutputStream.THRESHOLD, "1");
- exchange.getContext().setProperties(properties);
+ exchange.getContext().getStreamCachingStrategy().setSpoolThreshold(1);
+
+ context.start();
+
InputStream is = getTestFileStream();
InputStream cache = (InputStream)StreamCacheConverter.convertToStreamCache(is, exchange);
assertNotNull(IOConverter.toString(cache, null));
@@ -96,6 +99,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
}
public void testConvertToSerializable() throws Exception {
+ context.start();
+
InputStream is = getTestFileStream();
StreamCache cache = StreamCacheConverter.convertToStreamCache(is, exchange);
Serializable ser = StreamCacheConverter.convertToSerializable(cache, exchange);
@@ -103,6 +108,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
}
public void testConvertToByteArray() throws Exception {
+ context.start();
+
InputStream is = getTestFileStream();
StreamCache cache = StreamCacheConverter.convertToStreamCache(is, exchange);
byte[] bytes = StreamCacheConverter.convertToByteArray(cache, exchange);
@@ -114,4 +121,9 @@ public class StreamCacheConverterTest extends ContextTestSupport {
assertNotNull("Should have found the file: " + TEST_FILE + " on the classpath", answer);
return answer;
}
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
index 9f71441..d17410e 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
@@ -48,8 +48,8 @@ public class SplitterStreamCacheTest extends ContextTestSupport {
return new RouteBuilder() {
public void configure() {
//ensure stream is spooled to disk
- getContext().getProperties().put(CachedOutputStream.TEMP_DIR, "target/tmp");
- getContext().getProperties().put(CachedOutputStream.THRESHOLD, "1");
+ context.getStreamCachingStrategy().setTemporaryDirectory("target/tmp");
+ context.getStreamCachingStrategy().setSpoolThreshold(-1);
from("seda:parallel?concurrentConsumers=5").streamCaching()
.split(XPathBuilder.xpath("//person/city"))
http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
----------------------------------------------------------------------
diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index d521e7d..1454233 100644
--- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -79,6 +79,7 @@ import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanFilter;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.ShutdownStrategy;
+import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.spi.ThreadPoolFactory;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.spi.UuidGenerator;
@@ -789,5 +790,10 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
LOG.info("Using custom NodeIdFactory: " + nodeIdFactory);
getContext().setNodeIdFactory(nodeIdFactory);
}
+ StreamCachingStrategy streamCachingStrategy = getBeanForType(StreamCachingStrategy.class);
+ if (streamCachingStrategy != null) {
+ LOG.info("Using custom StreamCachingStrategy: " + streamCachingStrategy);
+ getContext().setStreamCachingStrategy(streamCachingStrategy);
+ }
}
}