You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/20 11:13:03 UTC

[4/4] git commit: CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.

CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/90e08bb3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/90e08bb3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/90e08bb3

Branch: refs/heads/master
Commit: 90e08bb3098f74c75f8c296c92d327780ee7a5a7
Parents: 8589082
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Jul 20 10:11:56 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jul 20 11:12:42 2013 +0200

----------------------------------------------------------------------
 .../stream/ByteArrayInputStreamCache.java       | 49 +++++++++++++++++
 .../converter/stream/CachedOutputStream.java    |  2 +-
 .../camel/converter/stream/CipherPair.java      | 30 +++++------
 .../converter/stream/FileInputStreamCache.java  |  6 ++-
 .../converter/stream/InputStreamCache.java      |  5 +-
 .../stream/MarkableInputStreamCache.java        | 55 --------------------
 .../camel/converter/stream/ReaderCache.java     |  7 ++-
 .../camel/converter/stream/SourceCache.java     |  5 +-
 .../converter/stream/StreamCacheConverter.java  | 16 +++---
 .../converter/stream/StreamSourceCache.java     |  4 +-
 .../processor/StreamCachingResetProcessor.java  | 35 -------------
 .../processor/interceptor/DefaultChannel.java   |  1 -
 .../processor/interceptor/StreamCaching.java    |  2 +
 .../interceptor/StreamCachingInterceptor.java   |  2 +
 .../StreamCachingResetProcessor.java            | 40 ++++++++++++++
 .../stream/ByteArrayInputStreamCacheTest.java   | 42 +++++++++++++++
 .../stream/CachedOutputStreamTest.java          |  4 +-
 .../converter/stream/InputStreamCacheTest.java  | 41 +++++++++++++++
 .../stream/MarkableInputStreamCacheTest.java    | 41 ---------------
 .../OnExceptionUseOriginalMessageTest.java      |  4 +-
 20 files changed, 214 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
new file mode 100644
index 0000000..120fc99
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.camel.StreamCache;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * A {@link StreamCache} for {@link java.io.ByteArrayInputStream}
+ */
+public class ByteArrayInputStreamCache extends FilterInputStream implements StreamCache {
+
+    public ByteArrayInputStreamCache(ByteArrayInputStream in) {
+        super(in);
+    }
+
+    public void reset() {
+        try {
+            super.reset();
+        } catch (IOException e) {
+            // ignore
+        }
+    }
+
+
+    @Override
+    public void writeTo(OutputStream os) throws IOException {
+        IOHelper.copyAndCloseInput(in, os);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index c04d06e..b25046b 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -180,7 +180,7 @@ public class CachedOutputStream extends OutputStream {
 
         if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
-                return new MarkableInputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
+                return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
             } else {
                 throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
index 5f44d74..159699b 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.converter.stream;
 
 import java.security.GeneralSecurityException;
@@ -29,9 +28,9 @@ import javax.crypto.spec.IvParameterSpec;
  * A class to hold a pair of encryption and decryption ciphers.
  */
 public class CipherPair {
-    private String transformation;
-    private Cipher enccipher;
-    private Cipher deccipher;
+    private final String transformation;
+    private final Cipher enccipher;
+    private final Cipher deccipher;
     
     public CipherPair(String transformation) throws GeneralSecurityException {
         this.transformation = transformation;
@@ -43,20 +42,15 @@ public class CipherPair {
         } else {
             a = transformation;
         }
-        try {
-            KeyGenerator keygen = KeyGenerator.getInstance(a);
-            keygen.init(new SecureRandom());
-            Key key = keygen.generateKey();
-            enccipher = Cipher.getInstance(transformation);
-            deccipher = Cipher.getInstance(transformation);
-            enccipher.init(Cipher.ENCRYPT_MODE, key);
-            final byte[] ivp = enccipher.getIV();
-            deccipher.init(Cipher.DECRYPT_MODE, key, ivp == null ? null : new IvParameterSpec(ivp));
-        } catch (GeneralSecurityException e) {
-            enccipher = null;
-            deccipher = null;
-            throw e;
-        }
+
+        KeyGenerator keygen = KeyGenerator.getInstance(a);
+        keygen.init(new SecureRandom());
+        Key key = keygen.generateKey();
+        enccipher = Cipher.getInstance(transformation);
+        deccipher = Cipher.getInstance(transformation);
+        enccipher.init(Cipher.ENCRYPT_MODE, key);
+        final byte[] ivp = enccipher.getIV();
+        deccipher.init(Cipher.DECRYPT_MODE, key, ivp == null ? null : new IvParameterSpec(ivp));
     }
     
     public String getTransformation() {

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index 558fda0..10f8b22 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -26,14 +26,16 @@ import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
-
 import javax.crypto.CipherInputStream;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 
-public class FileInputStreamCache extends InputStream implements StreamCache {
+/**
+ * A {@link StreamCache} for {@link File}s
+ */
+public final class FileInputStreamCache extends InputStream implements StreamCache {
     private InputStream stream;
     private File file;
     private CipherPair ciphers;

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
index 4e596e0..340675b 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
@@ -23,10 +23,9 @@ import java.io.OutputStream;
 import org.apache.camel.StreamCache;
 
 /**
- * @deprecated  use {@link MarkableInputStreamCache} instead.
+ * A {@link StreamCache} for caching using an in-memory byte array.
  */
-@Deprecated
-public class InputStreamCache extends ByteArrayInputStream implements StreamCache {
+public final class InputStreamCache extends ByteArrayInputStream implements StreamCache {
 
     public InputStreamCache(byte[] data) {
         super(data);

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/MarkableInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/MarkableInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/MarkableInputStreamCache.java
deleted file mode 100644
index 048786b..0000000
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/MarkableInputStreamCache.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.converter.stream;
-
-import java.io.ByteArrayInputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.camel.StreamCache;
-import org.apache.camel.util.IOHelper;
-
-/**
- * A {@link StreamCache} for {@link InputStream} that supports mark.
- */
-public final class MarkableInputStreamCache extends FilterInputStream implements StreamCache {
-
-    public MarkableInputStreamCache(byte[] data) {
-        this(new ByteArrayInputStream(data));
-        mark(data.length);
-    }
-
-    public MarkableInputStreamCache(InputStream in) {
-        super(in);
-    }
-
-    @Override
-    public void reset() {
-        try {
-            in.reset();
-        } catch (IOException e) {
-            // ignore
-        }
-    }
-
-    @Override
-    public void writeTo(OutputStream os) throws IOException {
-        IOHelper.copyAndCloseInput(in, os);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
index dbb1023..381f08f 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
@@ -25,13 +25,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link org.apache.camel.StreamCache} implementation for Cache the Reader {@link java.io.Reader}s
+ * A {@link org.apache.camel.StreamCache} for String {@link java.io.Reader}s
  */
 public class ReaderCache extends StringReader implements StreamCache {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(ReaderCache.class);
-
-    private String data;
+    private final String data;
 
     public ReaderCache(String data) {
         super(data);
@@ -47,7 +46,7 @@ public class ReaderCache extends StringReader implements StreamCache {
         try {
             super.reset();
         } catch (IOException e) {
-            LOG.warn("Cannot reset cache", e);
+            // ignore
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
index 837f939..9e23f51 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
@@ -26,13 +26,10 @@ import org.apache.camel.util.IOHelper;
 /**
  * {@link org.apache.camel.StreamCache} implementation for {@link org.apache.camel.StringSource}s
  */
-public class SourceCache extends StringSource implements StreamCache {
+public final class SourceCache extends StringSource implements StreamCache {
 
     private static final long serialVersionUID = 1L;
 
-    public SourceCache() {
-    }
-
     public SourceCache(String data) {
         super(data);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
index 0282565..b138768 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.converter.stream;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -69,14 +70,15 @@ public final class StreamCacheConverter {
     }
 
     @Converter
+    public static StreamCache convertToStreamCache(ByteArrayInputStream stream, Exchange exchange) throws IOException {
+        return new ByteArrayInputStreamCache(stream);
+    }
+
+    @Converter
     public static StreamCache convertToStreamCache(InputStream stream, Exchange exchange) throws IOException {
-        if (stream.markSupported()) {
-            return new MarkableInputStreamCache(stream);
-        } else {
-            CachedOutputStream cos = new CachedOutputStream(exchange);
-            IOHelper.copyAndCloseInput(stream, cos);
-            return cos.getStreamCache();
-        }
+        CachedOutputStream cos = new CachedOutputStream(exchange);
+        IOHelper.copyAndCloseInput(stream, cos);
+        return cos.getStreamCache();
     }
 
     @Converter

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
index 62411cd..11bafc2 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
@@ -28,9 +28,9 @@ import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 
 /**
- * {@link org.apache.camel.StreamCache} implementation for Cache the StreamSource {@link javax.xml.transform.stream.StreamSource}s
+ * A {@link org.apache.camel.StreamCache} for {@link javax.xml.transform.stream.StreamSource}s
  */
-public class StreamSourceCache extends StreamSource implements StreamCache {
+public final class StreamSourceCache extends StreamSource implements StreamCache {
 
     private final InputStream stream;
     private final StreamCache streamCache;

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/StreamCachingResetProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamCachingResetProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/StreamCachingResetProcessor.java
deleted file mode 100644
index 5deefb4..0000000
--- a/camel-core/src/main/java/org/apache/camel/processor/StreamCachingResetProcessor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.util.MessageHelper;
-
-public class StreamCachingResetProcessor extends DelegateAsyncProcessor {
-
-    public StreamCachingResetProcessor(Processor processor) {
-        super(processor);
-    }
-
-    @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        MessageHelper.resetStreamCache(exchange.getIn());
-        return super.process(exchange, callback);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index ab2d3e4..1ca4064 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -35,7 +35,6 @@ import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RouteDefinitionHelper;
 import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.InterceptorToAsyncProcessorBridge;
-import org.apache.camel.processor.StreamCachingResetProcessor;
 import org.apache.camel.processor.WrapProcessor;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
index 926f258..ed4754d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
@@ -25,6 +25,8 @@ import org.apache.camel.spi.InterceptStrategy;
 
 /**
  * {@link InterceptStrategy} implementation to configure stream caching on a RouteContext
+ *
+ * @deprecated no longer in use, will be removed in next Camel release.
  */
 @Deprecated
 public final class StreamCaching implements InterceptStrategy {

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
index 374a948..ddf3a04 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
@@ -26,6 +26,8 @@ import org.apache.camel.util.MessageHelper;
 /**
  * An interceptor that converts streams messages into a re-readable format
  * by wrapping the stream into a {@link StreamCache}.
+ *
+ * @deprecated no longer in use, will be removed in next Camel release.
  */
 @Deprecated
 public class StreamCachingInterceptor extends DelegateAsyncProcessor {

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
new file mode 100644
index 0000000..57b5c96
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.interceptor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.util.MessageHelper;
+
+/**
+ * {@link Processor} to reset {@link org.apache.camel.StreamCache} to ensure the stream
+ * is ready and re-readable for processing.
+ */
+public class StreamCachingResetProcessor extends DelegateAsyncProcessor {
+
+    public StreamCachingResetProcessor(Processor processor) {
+        super(processor);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        MessageHelper.resetStreamCache(exchange.getIn());
+        return super.process(exchange, callback);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/converter/stream/ByteArrayInputStreamCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/ByteArrayInputStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/ByteArrayInputStreamCacheTest.java
new file mode 100644
index 0000000..c5e3210
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/ByteArrayInputStreamCacheTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * @version 
+ */
+public class ByteArrayInputStreamCacheTest extends ContextTestSupport {
+
+    public void testByteArrayInputStream() throws Exception {
+        ByteArrayInputStreamCache cache = new ByteArrayInputStreamCache(new ByteArrayInputStream("<foo>bar</foo>".getBytes()));
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        cache.writeTo(bos);
+
+        String s = context.getTypeConverter().convertTo(String.class, bos);
+        assertEquals("<foo>bar</foo>", s);
+
+        IOHelper.close(cache, bos);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
index 0698100..a705e1a 100644
--- a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
@@ -196,7 +196,7 @@ public class CachedOutputStreamTest extends ContextTestSupport {
 
         assertEquals("we should have no temp file", files.length, 0);
         StreamCache cache = cos.getStreamCache();
-        assertTrue("Should get the InputStreamCache", cache instanceof MarkableInputStreamCache);
+        assertTrue("Should get the InputStreamCache", cache instanceof InputStreamCache);
         String temp = IOConverter.toString((InputStream)cache, null);
         assertEquals("Cached a wrong file", temp, TEST_STRING);
 
@@ -217,7 +217,7 @@ public class CachedOutputStreamTest extends ContextTestSupport {
 
         assertEquals("we should have no temp file", files.length, 0);
         StreamCache cache = cos.getStreamCache();
-        assertTrue("Should get the InputStreamCache", cache instanceof MarkableInputStreamCache);
+        assertTrue("Should get the InputStreamCache", cache instanceof InputStreamCache);
         String temp = IOConverter.toString((InputStream)cache, null);
         assertEquals("Cached a wrong file", temp, TEST_STRING);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/converter/stream/InputStreamCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/InputStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/InputStreamCacheTest.java
new file mode 100644
index 0000000..5f02265
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/InputStreamCacheTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * @version 
+ */
+public class InputStreamCacheTest extends ContextTestSupport {
+
+    public void testInputStreamCache() throws Exception {
+        InputStreamCache cache = new InputStreamCache("<foo>bar</foo>".getBytes());
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        cache.writeTo(bos);
+
+        String s = context.getTypeConverter().convertTo(String.class, bos);
+        assertEquals("<foo>bar</foo>", s);
+
+        IOHelper.close(cache, bos);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/converter/stream/MarkableInputStreamCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/MarkableInputStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/MarkableInputStreamCacheTest.java
deleted file mode 100644
index b8e778a..0000000
--- a/camel-core/src/test/java/org/apache/camel/converter/stream/MarkableInputStreamCacheTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.converter.stream;
-
-import java.io.ByteArrayOutputStream;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.util.IOHelper;
-
-/**
- * @version 
- */
-public class MarkableInputStreamCacheTest extends ContextTestSupport {
-
-    public void testInputStreamCache() throws Exception {
-        MarkableInputStreamCache cache = new MarkableInputStreamCache("<foo>bar</foo>".getBytes());
-
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        cache.writeTo(bos);
-
-        String s = context.getTypeConverter().convertTo(String.class, bos);
-        assertEquals("<foo>bar</foo>", s);
-
-        IOHelper.close(cache, bos);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java b/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java
index 25eb746..010fe64 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java
@@ -21,7 +21,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.converter.stream.MarkableInputStreamCache;
+import org.apache.camel.converter.stream.InputStreamCache;
 
 /**
  * @version 
@@ -50,7 +50,7 @@ public class OnExceptionUseOriginalMessageTest extends ContextTestSupport {
         getMockEndpoint("mock:end").expectedMessageCount(1);
         getMockEndpoint("mock:end").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class);
     
-        MarkableInputStreamCache cache = new MarkableInputStreamCache(TEST_STRING.getBytes());
+        InputStreamCache cache = new InputStreamCache(TEST_STRING.getBytes());
         
         template.sendBody("direct:a", cache);