You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/20 11:13:03 UTC
[4/4] git commit: CAMEL-6476: Introducing StreamCachingStrategy SPI
to make it easier to configure and allow 3rd party to plugin custom
strategies. Work in progress.
CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/90e08bb3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/90e08bb3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/90e08bb3
Branch: refs/heads/master
Commit: 90e08bb3098f74c75f8c296c92d327780ee7a5a7
Parents: 8589082
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Jul 20 10:11:56 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jul 20 11:12:42 2013 +0200
----------------------------------------------------------------------
.../stream/ByteArrayInputStreamCache.java | 49 +++++++++++++++++
.../converter/stream/CachedOutputStream.java | 2 +-
.../camel/converter/stream/CipherPair.java | 30 +++++------
.../converter/stream/FileInputStreamCache.java | 6 ++-
.../converter/stream/InputStreamCache.java | 5 +-
.../stream/MarkableInputStreamCache.java | 55 --------------------
.../camel/converter/stream/ReaderCache.java | 7 ++-
.../camel/converter/stream/SourceCache.java | 5 +-
.../converter/stream/StreamCacheConverter.java | 16 +++---
.../converter/stream/StreamSourceCache.java | 4 +-
.../processor/StreamCachingResetProcessor.java | 35 -------------
.../processor/interceptor/DefaultChannel.java | 1 -
.../processor/interceptor/StreamCaching.java | 2 +
.../interceptor/StreamCachingInterceptor.java | 2 +
.../StreamCachingResetProcessor.java | 40 ++++++++++++++
.../stream/ByteArrayInputStreamCacheTest.java | 42 +++++++++++++++
.../stream/CachedOutputStreamTest.java | 4 +-
.../converter/stream/InputStreamCacheTest.java | 41 +++++++++++++++
.../stream/MarkableInputStreamCacheTest.java | 41 ---------------
.../OnExceptionUseOriginalMessageTest.java | 4 +-
20 files changed, 214 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
new file mode 100644
index 0000000..120fc99
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.camel.StreamCache;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * A {@link StreamCache} for {@link java.io.ByteArrayInputStream}
+ */
+public class ByteArrayInputStreamCache extends FilterInputStream implements StreamCache {
+
+ public ByteArrayInputStreamCache(ByteArrayInputStream in) {
+ super(in);
+ }
+
+ public void reset() {
+ try {
+ super.reset();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+
+ @Override
+ public void writeTo(OutputStream os) throws IOException {
+ IOHelper.copyAndCloseInput(in, os);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index c04d06e..b25046b 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -180,7 +180,7 @@ public class CachedOutputStream extends OutputStream {
if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
- return new MarkableInputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
+ return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
} else {
throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
index 5f44d74..159699b 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.converter.stream;
import java.security.GeneralSecurityException;
@@ -29,9 +28,9 @@ import javax.crypto.spec.IvParameterSpec;
* A class to hold a pair of encryption and decryption ciphers.
*/
public class CipherPair {
- private String transformation;
- private Cipher enccipher;
- private Cipher deccipher;
+ private final String transformation;
+ private final Cipher enccipher;
+ private final Cipher deccipher;
public CipherPair(String transformation) throws GeneralSecurityException {
this.transformation = transformation;
@@ -43,20 +42,15 @@ public class CipherPair {
} else {
a = transformation;
}
- try {
- KeyGenerator keygen = KeyGenerator.getInstance(a);
- keygen.init(new SecureRandom());
- Key key = keygen.generateKey();
- enccipher = Cipher.getInstance(transformation);
- deccipher = Cipher.getInstance(transformation);
- enccipher.init(Cipher.ENCRYPT_MODE, key);
- final byte[] ivp = enccipher.getIV();
- deccipher.init(Cipher.DECRYPT_MODE, key, ivp == null ? null : new IvParameterSpec(ivp));
- } catch (GeneralSecurityException e) {
- enccipher = null;
- deccipher = null;
- throw e;
- }
+
+ KeyGenerator keygen = KeyGenerator.getInstance(a);
+ keygen.init(new SecureRandom());
+ Key key = keygen.generateKey();
+ enccipher = Cipher.getInstance(transformation);
+ deccipher = Cipher.getInstance(transformation);
+ enccipher.init(Cipher.ENCRYPT_MODE, key);
+ final byte[] ivp = enccipher.getIV();
+ deccipher.init(Cipher.DECRYPT_MODE, key, ivp == null ? null : new IvParameterSpec(ivp));
}
public String getTransformation() {
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index 558fda0..10f8b22 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -26,14 +26,16 @@ import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
-
import javax.crypto.CipherInputStream;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StreamCache;
import org.apache.camel.util.IOHelper;
-public class FileInputStreamCache extends InputStream implements StreamCache {
+/**
+ * A {@link StreamCache} for {@link File}s
+ */
+public final class FileInputStreamCache extends InputStream implements StreamCache {
private InputStream stream;
private File file;
private CipherPair ciphers;
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
index 4e596e0..340675b 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
@@ -23,10 +23,9 @@ import java.io.OutputStream;
import org.apache.camel.StreamCache;
/**
- * @deprecated use {@link MarkableInputStreamCache} instead.
+ * A {@link StreamCache} for caching using an in-memory byte array.
*/
-@Deprecated
-public class InputStreamCache extends ByteArrayInputStream implements StreamCache {
+public final class InputStreamCache extends ByteArrayInputStream implements StreamCache {
public InputStreamCache(byte[] data) {
super(data);
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/MarkableInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/MarkableInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/MarkableInputStreamCache.java
deleted file mode 100644
index 048786b..0000000
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/MarkableInputStreamCache.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.converter.stream;
-
-import java.io.ByteArrayInputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.camel.StreamCache;
-import org.apache.camel.util.IOHelper;
-
-/**
- * A {@link StreamCache} for {@link InputStream} that supports mark.
- */
-public final class MarkableInputStreamCache extends FilterInputStream implements StreamCache {
-
- public MarkableInputStreamCache(byte[] data) {
- this(new ByteArrayInputStream(data));
- mark(data.length);
- }
-
- public MarkableInputStreamCache(InputStream in) {
- super(in);
- }
-
- @Override
- public void reset() {
- try {
- in.reset();
- } catch (IOException e) {
- // ignore
- }
- }
-
- @Override
- public void writeTo(OutputStream os) throws IOException {
- IOHelper.copyAndCloseInput(in, os);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
index dbb1023..381f08f 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
@@ -25,13 +25,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * {@link org.apache.camel.StreamCache} implementation for Cache the Reader {@link java.io.Reader}s
+ * A {@link org.apache.camel.StreamCache} for String {@link java.io.Reader}s
*/
public class ReaderCache extends StringReader implements StreamCache {
private static final transient Logger LOG = LoggerFactory.getLogger(ReaderCache.class);
-
- private String data;
+ private final String data;
public ReaderCache(String data) {
super(data);
@@ -47,7 +46,7 @@ public class ReaderCache extends StringReader implements StreamCache {
try {
super.reset();
} catch (IOException e) {
- LOG.warn("Cannot reset cache", e);
+ // ignore
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
index 837f939..9e23f51 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
@@ -26,13 +26,10 @@ import org.apache.camel.util.IOHelper;
/**
* {@link org.apache.camel.StreamCache} implementation for {@link org.apache.camel.StringSource}s
*/
-public class SourceCache extends StringSource implements StreamCache {
+public final class SourceCache extends StringSource implements StreamCache {
private static final long serialVersionUID = 1L;
- public SourceCache() {
- }
-
public SourceCache(String data) {
super(data);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
index 0282565..b138768 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.converter.stream;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -69,14 +70,15 @@ public final class StreamCacheConverter {
}
@Converter
+ public static StreamCache convertToStreamCache(ByteArrayInputStream stream, Exchange exchange) throws IOException {
+ return new ByteArrayInputStreamCache(stream);
+ }
+
+ @Converter
public static StreamCache convertToStreamCache(InputStream stream, Exchange exchange) throws IOException {
- if (stream.markSupported()) {
- return new MarkableInputStreamCache(stream);
- } else {
- CachedOutputStream cos = new CachedOutputStream(exchange);
- IOHelper.copyAndCloseInput(stream, cos);
- return cos.getStreamCache();
- }
+ CachedOutputStream cos = new CachedOutputStream(exchange);
+ IOHelper.copyAndCloseInput(stream, cos);
+ return cos.getStreamCache();
}
@Converter
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
index 62411cd..11bafc2 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
@@ -28,9 +28,9 @@ import org.apache.camel.StreamCache;
import org.apache.camel.util.IOHelper;
/**
- * {@link org.apache.camel.StreamCache} implementation for Cache the StreamSource {@link javax.xml.transform.stream.StreamSource}s
+ * A {@link org.apache.camel.StreamCache} for {@link javax.xml.transform.stream.StreamSource}s
*/
-public class StreamSourceCache extends StreamSource implements StreamCache {
+public final class StreamSourceCache extends StreamSource implements StreamCache {
private final InputStream stream;
private final StreamCache streamCache;
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/StreamCachingResetProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamCachingResetProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/StreamCachingResetProcessor.java
deleted file mode 100644
index 5deefb4..0000000
--- a/camel-core/src/main/java/org/apache/camel/processor/StreamCachingResetProcessor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.util.MessageHelper;
-
-public class StreamCachingResetProcessor extends DelegateAsyncProcessor {
-
- public StreamCachingResetProcessor(Processor processor) {
- super(processor);
- }
-
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- MessageHelper.resetStreamCache(exchange.getIn());
- return super.process(exchange, callback);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index ab2d3e4..1ca4064 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -35,7 +35,6 @@ import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.RouteDefinitionHelper;
import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.processor.InterceptorToAsyncProcessorBridge;
-import org.apache.camel.processor.StreamCachingResetProcessor;
import org.apache.camel.processor.WrapProcessor;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.LifecycleStrategy;
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
index 926f258..ed4754d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
@@ -25,6 +25,8 @@ import org.apache.camel.spi.InterceptStrategy;
/**
* {@link InterceptStrategy} implementation to configure stream caching on a RouteContext
+ *
+ * @deprecated no longer in use, will be removed in next Camel release.
*/
@Deprecated
public final class StreamCaching implements InterceptStrategy {
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
index 374a948..ddf3a04 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
@@ -26,6 +26,8 @@ import org.apache.camel.util.MessageHelper;
/**
* An interceptor that converts streams messages into a re-readable format
* by wrapping the stream into a {@link StreamCache}.
+ *
+ * @deprecated no longer in use, will be removed in next Camel release.
*/
@Deprecated
public class StreamCachingInterceptor extends DelegateAsyncProcessor {
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
new file mode 100644
index 0000000..57b5c96
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.interceptor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.util.MessageHelper;
+
+/**
+ * {@link Processor} to reset {@link org.apache.camel.StreamCache} to ensure the stream
+ * is ready and re-readable for processing.
+ */
+public class StreamCachingResetProcessor extends DelegateAsyncProcessor {
+
+ public StreamCachingResetProcessor(Processor processor) {
+ super(processor);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ MessageHelper.resetStreamCache(exchange.getIn());
+ return super.process(exchange, callback);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/converter/stream/ByteArrayInputStreamCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/ByteArrayInputStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/ByteArrayInputStreamCacheTest.java
new file mode 100644
index 0000000..c5e3210
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/ByteArrayInputStreamCacheTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * @version
+ */
+public class ByteArrayInputStreamCacheTest extends ContextTestSupport {
+
+ public void testByteArrayInputStream() throws Exception {
+ ByteArrayInputStreamCache cache = new ByteArrayInputStreamCache(new ByteArrayInputStream("<foo>bar</foo>".getBytes()));
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ cache.writeTo(bos);
+
+ String s = context.getTypeConverter().convertTo(String.class, bos);
+ assertEquals("<foo>bar</foo>", s);
+
+ IOHelper.close(cache, bos);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
index 0698100..a705e1a 100644
--- a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
@@ -196,7 +196,7 @@ public class CachedOutputStreamTest extends ContextTestSupport {
assertEquals("we should have no temp file", files.length, 0);
StreamCache cache = cos.getStreamCache();
- assertTrue("Should get the InputStreamCache", cache instanceof MarkableInputStreamCache);
+ assertTrue("Should get the InputStreamCache", cache instanceof InputStreamCache);
String temp = IOConverter.toString((InputStream)cache, null);
assertEquals("Cached a wrong file", temp, TEST_STRING);
@@ -217,7 +217,7 @@ public class CachedOutputStreamTest extends ContextTestSupport {
assertEquals("we should have no temp file", files.length, 0);
StreamCache cache = cos.getStreamCache();
- assertTrue("Should get the InputStreamCache", cache instanceof MarkableInputStreamCache);
+ assertTrue("Should get the InputStreamCache", cache instanceof InputStreamCache);
String temp = IOConverter.toString((InputStream)cache, null);
assertEquals("Cached a wrong file", temp, TEST_STRING);
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/converter/stream/InputStreamCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/InputStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/InputStreamCacheTest.java
new file mode 100644
index 0000000..5f02265
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/converter/stream/InputStreamCacheTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * @version
+ */
+public class InputStreamCacheTest extends ContextTestSupport {
+
+ public void testInputStreamCache() throws Exception {
+ InputStreamCache cache = new InputStreamCache("<foo>bar</foo>".getBytes());
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ cache.writeTo(bos);
+
+ String s = context.getTypeConverter().convertTo(String.class, bos);
+ assertEquals("<foo>bar</foo>", s);
+
+ IOHelper.close(cache, bos);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/converter/stream/MarkableInputStreamCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/MarkableInputStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/MarkableInputStreamCacheTest.java
deleted file mode 100644
index b8e778a..0000000
--- a/camel-core/src/test/java/org/apache/camel/converter/stream/MarkableInputStreamCacheTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.converter.stream;
-
-import java.io.ByteArrayOutputStream;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.util.IOHelper;
-
-/**
- * @version
- */
-public class MarkableInputStreamCacheTest extends ContextTestSupport {
-
- public void testInputStreamCache() throws Exception {
- MarkableInputStreamCache cache = new MarkableInputStreamCache("<foo>bar</foo>".getBytes());
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- cache.writeTo(bos);
-
- String s = context.getTypeConverter().convertTo(String.class, bos);
- assertEquals("<foo>bar</foo>", s);
-
- IOHelper.close(cache, bos);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/90e08bb3/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java b/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java
index 25eb746..010fe64 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageTest.java
@@ -21,7 +21,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.converter.stream.MarkableInputStreamCache;
+import org.apache.camel.converter.stream.InputStreamCache;
/**
* @version
@@ -50,7 +50,7 @@ public class OnExceptionUseOriginalMessageTest extends ContextTestSupport {
getMockEndpoint("mock:end").expectedMessageCount(1);
getMockEndpoint("mock:end").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class);
- MarkableInputStreamCache cache = new MarkableInputStreamCache(TEST_STRING.getBytes());
+ InputStreamCache cache = new InputStreamCache(TEST_STRING.getBytes());
template.sendBody("direct:a", cache);