You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/17 17:11:39 UTC

git commit: CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.

Updated Branches:
  refs/heads/master 6f0d3368f -> d0235489f


CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0235489
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0235489
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0235489

Branch: refs/heads/master
Commit: d0235489fd77031c70501d6e859807ae72117034
Parents: 6f0d336
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 17 16:05:42 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 17 17:11:28 2013 +0200

----------------------------------------------------------------------
 .../converter/stream/CachedOutputStream.java    |  49 ++++-----
 .../apache/camel/impl/DefaultCamelContext.java  |   6 --
 .../impl/DefaultStreamCachingStrategy.java      | 100 ++++++++++++++++++-
 .../apache/camel/spi/StreamCachingStrategy.java |  46 +++++++++
 .../java/org/apache/camel/util/IOHelper.java    |   5 +-
 .../stream/CachedOutputStreamTest.java          |  56 ++++++++---
 .../stream/StreamCacheConverterTest.java        |  26 +++--
 .../processor/SplitterStreamCacheTest.java      |   4 +-
 .../xml/AbstractCamelContextFactoryBean.java    |   6 ++
 9 files changed, 231 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index f12a111..1b7f90b 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -31,6 +31,7 @@ import javax.crypto.CipherOutputStream;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
+import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
@@ -50,22 +51,22 @@ import org.slf4j.LoggerFactory;
  * fileInputStream is closed after the exchange is completed.
  */
 public class CachedOutputStream extends OutputStream {
+    @Deprecated
     public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
+    @Deprecated
     public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize";
+    @Deprecated
     public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
+    @Deprecated
     public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
     private static final transient Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class);
 
+    private final StreamCachingStrategy strategy;
     private OutputStream currentStream;
     private boolean inMemory = true;
     private int totalLength;
     private File tempFile;
     private FileInputStreamCache fileInputStreamCache;
-
-    private long threshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
-    private int bufferSize = 2 * 1024;
-    private File outputDir;
-    private String cipherTransformation;
     private CipherPair ciphers;
 
     public CachedOutputStream(Exchange exchange) {
@@ -73,25 +74,8 @@ public class CachedOutputStream extends OutputStream {
     }
 
     public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) {
-        // TODO: these options should be on StreamCachingStrategy
-        String bufferSize = exchange.getContext().getProperty(BUFFER_SIZE);
-        String hold = exchange.getContext().getProperty(THRESHOLD);
-        String dir = exchange.getContext().getProperty(TEMP_DIR);
-        
-        if (bufferSize != null) {
-            this.bufferSize = exchange.getContext().getTypeConverter().convertTo(Integer.class, bufferSize);
-        }
-        if (hold != null) {
-            this.threshold = exchange.getContext().getTypeConverter().convertTo(Long.class, hold);
-        }
-        if (dir != null) {
-            this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
-        } else {
-            this.outputDir = exchange.getContext().getStreamCachingStrategy().getTemporaryDirectory();
-        }
-        this.cipherTransformation = exchange.getContext().getProperty(CIPHER_TRANSFORMATION);
-       
-        currentStream = new ByteArrayOutputStream(this.bufferSize);
+        this.strategy = exchange.getContext().getStreamCachingStrategy();
+        currentStream = new ByteArrayOutputStream(strategy.getBufferSize());
         
         if (closedOnCompletion) {
             // add on completion so we can cleanup after the exchange is done such as deleting temporary files
@@ -143,7 +127,7 @@ public class CachedOutputStream extends OutputStream {
 
     public void write(byte[] b, int off, int len) throws IOException {
         this.totalLength += len;
-        if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+        if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) {
             pageToFileStream();
         }
         currentStream.write(b, off, len);
@@ -151,7 +135,7 @@ public class CachedOutputStream extends OutputStream {
 
     public void write(byte[] b) throws IOException {
         this.totalLength += b.length;
-        if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+        if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) {
             pageToFileStream();
         }
         currentStream.write(b);
@@ -159,7 +143,7 @@ public class CachedOutputStream extends OutputStream {
 
     public void write(int b) throws IOException {
         this.totalLength++;
-        if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+        if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) {
             pageToFileStream();
         }
         currentStream.write(b);
@@ -224,7 +208,7 @@ public class CachedOutputStream extends OutputStream {
         flush();
 
         ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
-        tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir);
+        tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getTemporaryDirectory());
 
         LOG.trace("Creating temporary stream cache file: {}", tempFile);
 
@@ -236,9 +220,10 @@ public class CachedOutputStream extends OutputStream {
             inMemory = false;
         }
     }
-    
+
+    @Deprecated
     public int getBufferSize() {
-        return bufferSize;
+        return strategy.getBufferSize();
     }
 
     // This class will close the CachedOutputStream when it is closed
@@ -275,10 +260,10 @@ public class CachedOutputStream extends OutputStream {
 
     private OutputStream createOutputStream(File file) throws IOException {
         OutputStream out = new BufferedOutputStream(new FileOutputStream(file));
-        if (ObjectHelper.isNotEmpty(cipherTransformation)) {
+        if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
             try {
                 if (ciphers == null) {
-                    ciphers = new CipherPair(cipherTransformation);
+                    ciphers = new CipherPair(strategy.getSpoolChiper());
                 }
             } catch (GeneralSecurityException e) {
                 throw new IOException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index e4017be..b977979 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -1676,12 +1676,6 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
             }
         }
         if (streamCachingInUse) {
-            Long threshold = CamelContextHelper.convertTo(this, Long.class, getProperties().get("CamelCachedOutputStreamThreshold"));
-            if (threshold == null) {
-                threshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
-            }
-            log.info("Stream caching is enabled, and using {} kb as threshold for overflow and spooling to disk store.", threshold / 1024);
-
             // stream caching is in use so enable the strategy
             addService(streamCachingStrategy);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
index 3333d77..d5f558a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
@@ -18,19 +18,52 @@ package org.apache.camel.impl;
 
 import java.io.File;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StreamCache;
 import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements StreamCachingStrategy {
+/**
+ * Default implementation of {@link StreamCachingStrategy}
+ */
+public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements CamelContextAware, StreamCachingStrategy {
 
-    // TODO: Add options to configure more stuff like overflow size et all
     // TODO: Add JMX management
     // TODO: Maybe use #syntax# for default temp dir so ppl can easily configure this
 
+    @Deprecated
+    public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
+    @Deprecated
+    public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize";
+    @Deprecated
+    public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
+    @Deprecated
+    public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
+
     private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamCachingStrategy.class);
+
+    private CamelContext camelContext;
     private File temporaryDirectory;
+    private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
+    private String spoolChiper;
+    private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE;
+    private boolean removeTemporaryDirectoryWhenStopping = true;
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public void setTemporaryDirectory(String path) {
+        this.temporaryDirectory = new File(path);
+    }
 
     public void setTemporaryDirectory(File path) {
         this.temporaryDirectory = path;
@@ -40,8 +73,60 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
         return temporaryDirectory;
     }
 
+    public long getSpoolThreshold() {
+        return spoolThreshold;
+    }
+
+    public void setSpoolThreshold(long spoolThreshold) {
+        this.spoolThreshold = spoolThreshold;
+    }
+
+    public String getSpoolChiper() {
+        return spoolChiper;
+    }
+
+    public void setSpoolChiper(String spoolChiper) {
+        this.spoolChiper = spoolChiper;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public boolean isRemoveTemporaryDirectoryWhenStopping() {
+        return removeTemporaryDirectoryWhenStopping;
+    }
+
+    public void setRemoveTemporaryDirectoryWhenStopping(boolean removeTemporaryDirectoryWhenStopping) {
+        this.removeTemporaryDirectoryWhenStopping = removeTemporaryDirectoryWhenStopping;
+    }
+
     @Override
     protected void doStart() throws Exception {
+        String bufferSize = camelContext.getProperty(BUFFER_SIZE);
+        String hold = camelContext.getProperty(THRESHOLD);
+        String chiper = camelContext.getProperty(CIPHER_TRANSFORMATION);
+        String dir = camelContext.getProperty(TEMP_DIR);
+
+        if (bufferSize != null) {
+            this.bufferSize = camelContext.getTypeConverter().convertTo(Integer.class, bufferSize);
+        }
+        if (hold != null) {
+            this.spoolThreshold = camelContext.getTypeConverter().convertTo(Long.class, hold);
+        }
+        if (chiper != null) {
+            this.spoolChiper = chiper;
+        }
+        if (dir != null) {
+            this.temporaryDirectory = camelContext.getTypeConverter().convertTo(File.class, dir);
+        }
+
+        LOG.info("StreamCaching in use with {}", this.toString());
+
         // create random temporary directory if none has been created
         if (temporaryDirectory == null) {
             temporaryDirectory = FileUtil.createNewTempDir();
@@ -60,9 +145,18 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
 
     @Override
     protected void doStop() throws Exception {
-        if (temporaryDirectory != null) {
+        if (temporaryDirectory != null  && isRemoveTemporaryDirectoryWhenStopping()) {
             LOG.info("Removing temporary directory {}", temporaryDirectory);
             FileUtil.removeDir(temporaryDirectory);
         }
     }
+
+    @Override
+    public String toString() {
+        return "DefaultStreamCachingStrategy["
+            + "temporaryDirectory=" + temporaryDirectory
+            + ", spoolThreshold=" + spoolThreshold
+            + ", spoolChiper=" + spoolChiper
+            + ", bufferSize=" + bufferSize + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
index cc5ad2e..a5cfa1b 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
@@ -23,7 +23,53 @@ import java.io.File;
  */
 public interface StreamCachingStrategy {
 
+    /**
+     * Sets the temporary directory to use for overflow and spooling to disk.
+     * <p/>
+     * If no temporary directory has been explicit configured, then a directory
+     * is created in the <tt>java.io.tmpdir</tt> directory.
+     */
     void setTemporaryDirectory(File path);
 
     File getTemporaryDirectory();
+
+    void setTemporaryDirectory(String path);
+
+    /**
+     * Threshold in bytes when overflow to disk is activated.
+     * <p/>
+     * The default threshold is {@link org.apache.camel.StreamCache#DEFAULT_SPOOL_THRESHOLD} bytes (eg 128kb).
+     * Use <tt>-1</tt> to disable overflow to disk.
+     */
+    void setSpoolThreshold(long threshold);
+
+    long getSpoolThreshold();
+
+    /**
+     * Sets the buffer size to use when copying between buffers.
+     * <p/>
+     * The default size is {@link org.apache.camel.util.IOHelper#DEFAULT_BUFFER_SIZE}
+     */
+    void setBufferSize(int bufferSize);
+
+    int getBufferSize();
+
+    /**
+     * Sets a chiper name to use when spooling to disk to write with encryption.
+     * <p/>
+     * By default the data is not encrypted.
+     */
+    void setSpoolChiper(String chiper);
+
+    String getSpoolChiper();
+
+    /**
+     * Whether to remove the temporary directory when stopping.
+     * <p/>
+     * This option is default <tt>true</tt>
+     */
+    void setRemoveTemporaryDirectoryWhenStopping(boolean remove);
+
+    boolean isRemoveTemporaryDirectoryWhenStopping();
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
index c381e90..76d1e5f 100644
--- a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
@@ -44,9 +44,10 @@ import org.slf4j.LoggerFactory;
  * @version 
  */
 public final class IOHelper {
-    
+
+    public static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+
     private static final transient Logger LOG = LoggerFactory.getLogger(IOHelper.class);
-    private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
     private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
 
     private IOHelper() {

http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
index 46316f6..91729f1 100644
--- a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
@@ -38,11 +39,18 @@ public class CachedOutputStreamTest extends ContextTestSupport {
 
     private Exchange exchange;
 
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.setStreamCaching(true);
+        context.getStreamCachingStrategy().setTemporaryDirectory("target/cachedir");
+        context.getStreamCachingStrategy().setSpoolThreshold(16);
+        return context;
+    }
+
     protected void setUp() throws Exception {
         super.setUp();
-        
-        context.getProperties().put(CachedOutputStream.TEMP_DIR, "target/cachedir");
-        context.getProperties().put(CachedOutputStream.THRESHOLD, "16");
+
         deleteDirectory("target/cachedir");
         createDirectory("target/cachedir");
 
@@ -51,6 +59,11 @@ public class CachedOutputStreamTest extends ContextTestSupport {
         exchange.setUnitOfWork(uow);
     }
 
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
     private static String toString(InputStream input) throws IOException {
         BufferedReader reader = IOHelper.buffered(new InputStreamReader(input));
         CollectionStringBuffer builder = new CollectionStringBuffer();
@@ -63,7 +76,9 @@ public class CachedOutputStreamTest extends ContextTestSupport {
         }
     }
 
-    public void testCacheStreamToFileAndCloseStream() throws IOException {       
+    public void testCacheStreamToFileAndCloseStream() throws Exception {
+        context.start();
+
         CachedOutputStream cos = new CachedOutputStream(exchange);
         cos.write(TEST_STRING.getBytes("UTF-8"));
         
@@ -96,9 +111,12 @@ public class CachedOutputStreamTest extends ContextTestSupport {
         IOHelper.close(cos);
     }
     
-    public void testCacheStreamToFileAndCloseStreamEncrypted() throws IOException {
+    public void testCacheStreamToFileAndCloseStreamEncrypted() throws Exception {
         // set some stream or 8-bit block cipher transformation name
-        exchange.getContext().getProperties().put(CachedOutputStream.CIPHER_TRANSFORMATION, "RC4");
+        context.getStreamCachingStrategy().setSpoolChiper("RC4");
+
+        context.start();
+
         CachedOutputStream cos = new CachedOutputStream(exchange);
         cos.write(TEST_STRING.getBytes("UTF-8"));
         cos.flush();
@@ -137,7 +155,9 @@ public class CachedOutputStreamTest extends ContextTestSupport {
         IOHelper.close(cos);
     }
 
-    public void testCacheStreamToFileCloseStreamBeforeDone() throws IOException {
+    public void testCacheStreamToFileCloseStreamBeforeDone() throws Exception {
+        context.start();
+
         CachedOutputStream cos = new CachedOutputStream(exchange);
         cos.write(TEST_STRING.getBytes("UTF-8"));
 
@@ -163,8 +183,10 @@ public class CachedOutputStreamTest extends ContextTestSupport {
         IOHelper.close(cos);
     }
     
-    public void testCacheStreamToMemory() throws IOException {
-        context.getProperties().put(CachedOutputStream.THRESHOLD, "1024");
+    public void testCacheStreamToMemory() throws Exception {
+        context.getStreamCachingStrategy().setSpoolThreshold(1024);
+
+        context.start();
 
         CachedOutputStream cos = new CachedOutputStream(exchange);
         cos.write(TEST_STRING.getBytes("UTF-8"));
@@ -181,9 +203,11 @@ public class CachedOutputStreamTest extends ContextTestSupport {
         IOHelper.close(cos);
     }
 
-    public void testCacheStreamToMemoryAsDiskIsdisabled() throws IOException {
+    public void testCacheStreamToMemoryAsDiskIsDisabled() throws Exception {
         // -1 disables disk based cache
-        context.getProperties().put(CachedOutputStream.THRESHOLD, "-1");
+        context.getStreamCachingStrategy().setSpoolThreshold(-1);
+
+        context.start();
 
         CachedOutputStream cos = new CachedOutputStream(exchange);
         cos.write(TEST_STRING.getBytes("UTF-8"));
@@ -202,14 +226,16 @@ public class CachedOutputStreamTest extends ContextTestSupport {
         IOHelper.close(cos);
     }
     
-    public void testCachedOutputStreamCustomBufferSize() throws IOException {
+    public void testCachedOutputStreamCustomBufferSize() throws Exception {
         // double the default buffer size
-        context.getProperties().put(CachedOutputStream.BUFFER_SIZE, "4096");
-        
+        context.getStreamCachingStrategy().setBufferSize(8192);
+
+        context.start();
+
         CachedOutputStream cos = new CachedOutputStream(exchange);
         cos.write(TEST_STRING.getBytes("UTF-8"));
 
-        assertEquals("we should have a custom buffer size", cos.getBufferSize(), 4096);
+        assertEquals("we should have a custom buffer size", cos.getBufferSize(), 8192);
         
         // make sure things still work after custom buffer size set
         File file = new File("target/cachedir");

http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
index aeb710f..b792d09 100644
--- a/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
@@ -19,9 +19,6 @@ package org.apache.camel.converter.stream;
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
 import javax.xml.transform.Source;
 import javax.xml.transform.sax.SAXSource;
 import javax.xml.transform.stream.StreamSource;
@@ -51,6 +48,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
     }
     
     public void testConvertToStreamCache() throws Exception {
+        context.start();
+
         ByteArrayInputStream inputStream = new ByteArrayInputStream(MESSAGE.getBytes());
         StreamCache streamCache = StreamCacheConverter.convertToStreamCache(new SAXSource(new InputSource(inputStream)), exchange);
         String message = exchange.getContext().getTypeConverter().convertTo(String.class, streamCache);
@@ -59,6 +58,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
     }
 
     public void testConvertToStreamCacheStreamSource() throws Exception {
+        context.start();
+
         StreamSource source = new StreamSource(getTestFileStream());
         StreamCache cache = StreamCacheConverter.convertToStreamCache(source, exchange);
         //assert re-readability of the cached StreamSource
@@ -69,6 +70,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
     }
 
     public void testConvertToStreamCacheInputStream() throws Exception {
+        context.start();
+
         InputStream is = getTestFileStream();
         InputStream cache = (InputStream)StreamCacheConverter.convertToStreamCache(is, exchange);
         //assert re-readability of the cached InputStream
@@ -77,10 +80,10 @@ public class StreamCacheConverterTest extends ContextTestSupport {
     }
     
     public void testConvertToStreamCacheInputStreamWithFileCache() throws Exception {
-        // set up the properties
-        Map<String, String> properties = new HashMap<String, String>();
-        properties.put(CachedOutputStream.THRESHOLD, "1");
-        exchange.getContext().setProperties(properties);
+        exchange.getContext().getStreamCachingStrategy().setSpoolThreshold(1);
+
+        context.start();
+
         InputStream is = getTestFileStream();
         InputStream cache = (InputStream)StreamCacheConverter.convertToStreamCache(is, exchange);
         assertNotNull(IOConverter.toString(cache, null));
@@ -96,6 +99,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
     }
 
     public void testConvertToSerializable() throws Exception {
+        context.start();
+
         InputStream is = getTestFileStream();
         StreamCache cache = StreamCacheConverter.convertToStreamCache(is, exchange);
         Serializable ser = StreamCacheConverter.convertToSerializable(cache, exchange);
@@ -103,6 +108,8 @@ public class StreamCacheConverterTest extends ContextTestSupport {
     }
 
     public void testConvertToByteArray() throws Exception {
+        context.start();
+
         InputStream is = getTestFileStream();
         StreamCache cache = StreamCacheConverter.convertToStreamCache(is, exchange);
         byte[] bytes = StreamCacheConverter.convertToByteArray(cache, exchange);
@@ -114,4 +121,9 @@ public class StreamCacheConverterTest extends ContextTestSupport {
         assertNotNull("Should have found the file: " + TEST_FILE + " on the classpath", answer);
         return answer;
     }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
index 9f71441..d17410e 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
@@ -48,8 +48,8 @@ public class SplitterStreamCacheTest extends ContextTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 //ensure stream is spooled to disk
-                getContext().getProperties().put(CachedOutputStream.TEMP_DIR, "target/tmp");
-                getContext().getProperties().put(CachedOutputStream.THRESHOLD, "1");
+                context.getStreamCachingStrategy().setTemporaryDirectory("target/tmp");
+                context.getStreamCachingStrategy().setSpoolThreshold(-1);
 
                 from("seda:parallel?concurrentConsumers=5").streamCaching()
                     .split(XPathBuilder.xpath("//person/city"))

http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
----------------------------------------------------------------------
diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index d521e7d..1454233 100644
--- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -79,6 +79,7 @@ import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanFilter;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.ShutdownStrategy;
+import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.ThreadPoolFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.spi.UuidGenerator;
@@ -789,5 +790,10 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
             LOG.info("Using custom NodeIdFactory: " + nodeIdFactory);
             getContext().setNodeIdFactory(nodeIdFactory);
         }
+        StreamCachingStrategy streamCachingStrategy = getBeanForType(StreamCachingStrategy.class);
+        if (streamCachingStrategy != null) {
+            LOG.info("Using custom StreamCachingStrategy: " + streamCachingStrategy);
+            getContext().setStreamCachingStrategy(streamCachingStrategy);
+        }
     }
 }