You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/01 07:33:13 UTC
[camel] 01/03: CAMEL-19670: camel-core - useOriginalMessage should do like stream-ca… (#10934)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch sc-original-3
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 807850535aa865aefbd67975ac93b966aa8672fe
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 1 08:00:18 2023 +0200
CAMEL-19670: camel-core - useOriginalMessage should do like stream-ca… (#10934)
CAMEL-19670: camel-core - useOriginalMessage should do like stream-caching advice making defensive copy that is safe to re-read. Added unit test from Bartosz Popiela into camel-zipfile.
---
.../zipfile/ZipSplitterUseOriginalMessageTest.java | 92 +++++++++++++++++++
.../camel/impl/engine/CamelInternalProcessor.java | 57 +-----------
.../camel/impl/engine/DefaultUnitOfWork.java | 24 ++++-
.../camel/impl/engine/StreamCachingHelper.java | 90 ++++++++++++++++++
.../apache/camel/model/OnCompletionDefinition.java | 54 ++++++++++-
.../apache/camel/model/OnExceptionDefinition.java | 12 ++-
.../DefaultErrorHandlerDefinition.java | 18 ++--
...OnExceptionUseOriginalMessageStreamTwoTest.java | 102 +++++++++++++++++++++
8 files changed, 380 insertions(+), 69 deletions(-)
diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java
new file mode 100644
index 00000000000..48a6662f299
--- /dev/null
+++ b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.zipfile;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.model.dataformat.ZipFileDataFormat;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class ZipSplitterUseOriginalMessageTest extends CamelTestSupport {
+
+ private List<String> list1 = new ArrayList<>();
+ private List<String> list2 = new ArrayList<>();
+
+ @Test
+ public void testSplitter() throws InterruptedException {
+ MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
+ processZipEntry.expectedBodiesReceivedInAnyOrder("chau", "hi", "hola", "another_chiau", "another_hi");
+ MockEndpoint.assertIsSatisfied(context);
+
+ // should be the same
+ Arrays.deepEquals(list1.toArray(), list2.toArray());
+ }
+
+ private org.apache.camel.model.dataformat.ZipFileDataFormat multiEntryZipFormat() {
+ var zipFormat = new ZipFileDataFormat();
+ zipFormat.setUsingIterator("true");
+ return zipFormat;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ onException(Exception.class)
+ // turn on original message which caused CAMEL-19670
+ .useOriginalMessage();
+
+ // Unzip file and Split it according to FileEntry
+ from("file:src/test/resources/org/apache/camel/dataformat/zipfile/data?delay=1000&noop=true")
+ .log("Start processing big file: ${header.CamelFileName}")
+ .unmarshal(multiEntryZipFormat())
+ .split(bodyAs(Iterator.class)).streaming()
+ .log("${body}")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // should be able to read the stream
+ String s = exchange.getMessage().getBody(String.class);
+ list1.add(s);
+ }
+ })
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // should be able to read the stream again
+ String s = exchange.getMessage().getBody(String.class);
+ list2.add(s);
+ }
+ })
+ .to("mock:processZipEntry")
+ .end()
+ .log("Done processing big file: ${header.CamelFileName}");
+ }
+ };
+
+ }
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 5347504f47b..febb99ce644 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -36,7 +36,6 @@ import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.StatefulService;
import org.apache.camel.StreamCache;
-import org.apache.camel.StreamCacheException;
import org.apache.camel.impl.debugger.BacklogDebugger;
import org.apache.camel.impl.debugger.BacklogTracer;
import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage;
@@ -66,7 +65,6 @@ import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -918,62 +916,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
@Override
public StreamCache before(Exchange exchange) throws Exception {
- // check if body is already cached
- try {
- Object body = exchange.getIn().getBody();
- if (body == null) {
- return null;
- } else if (body instanceof StreamCache) {
- StreamCache sc = (StreamCache) body;
- // reset so the cache is ready to be used before processing
- sc.reset();
- return sc;
- }
- } catch (Exception e) {
- // lets allow Camels error handler to deal with stream cache failures
- StreamCacheException tce = new StreamCacheException(null, e);
- exchange.setException(tce);
- // because this is stream caching error then we cannot use redelivery as the message body is corrupt
- // so mark as redelivery exhausted
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
- }
- // check if we somewhere failed due to a stream caching exception
- Throwable cause = exchange.getException();
- if (cause == null) {
- cause = exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Throwable.class);
- }
- boolean failed = cause != null && ObjectHelper.getException(StreamCacheException.class, cause) != null;
- if (!failed) {
- boolean disabled = exchange.adapt(ExtendedExchange.class).isStreamCacheDisabled();
- if (disabled) {
- return null;
- }
- try {
- // cache the body and if we could do that replace it as the new body
- StreamCache sc = strategy.cache(exchange);
- if (sc != null) {
- exchange.getIn().setBody(sc);
- }
- return sc;
- } catch (Exception e) {
- // lets allow Camels error handler to deal with stream cache failures
- StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e);
- exchange.setException(tce);
- // because this is stream caching error then we cannot use redelivery as the message body is corrupt
- // so mark as redelivery exhausted
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
- }
- }
- return null;
+ return StreamCachingHelper.convertToStreamCache(strategy, exchange, exchange.getIn());
}
@Override
public void after(Exchange exchange, StreamCache sc) throws Exception {
- Object body = exchange.getMessage().getBody();
- if (body instanceof StreamCache) {
- // reset so the cache is ready to be reused after processing
- ((StreamCache) body).reset();
- }
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getMessage());
}
@Override
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index e82e4e85bb8..5c9a6ffae83 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -102,6 +102,21 @@ public class DefaultUnitOfWork implements UnitOfWork {
}
}
+ private boolean isStreamCacheInUse(Exchange exchange) {
+ boolean inUse = streamCachingStrategy.isEnabled();
+ if (inUse) {
+ // the original route (from route) may have disabled stream caching
+ String rid = exchange.getFromRouteId();
+ if (rid != null) {
+ Route route = exchange.getContext().getRoute(rid);
+ if (route != null) {
+ inUse = route.isStreamCaching() != null && route.isStreamCaching();
+ }
+ }
+ }
+ return inUse;
+ }
+
private void doOnPrepare(Exchange exchange) {
// unit of work is reused, so setup for this exchange
this.exchange = exchange;
@@ -119,11 +134,14 @@ public class DefaultUnitOfWork implements UnitOfWork {
if (this.originalInMessage instanceof MessageSupport) {
((MessageSupport) this.originalInMessage).setExchange(exchange);
}
- if (streamCachingStrategy.isEnabled()) {
- // if the input body is streaming we need to cache it, so we can access the original input message
- StreamCache cache = context.getStreamCachingStrategy().cache(this.originalInMessage);
+ if (streamCachingStrategy.isEnabled() && isStreamCacheInUse(exchange)) {
+ // if the input body is streaming we need to cache it, so we can access the original input message (like stream caching advice does)
+ StreamCache cache
+ = StreamCachingHelper.convertToStreamCache(streamCachingStrategy, exchange, this.originalInMessage);
if (cache != null) {
this.originalInMessage.setBody(cache);
+ // replace original incoming message with stream cache
+ this.exchange.getIn().setBody(cache);
}
}
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
new file mode 100644
index 00000000000..63fba73c035
--- /dev/null
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.Message;
+import org.apache.camel.StreamCache;
+import org.apache.camel.StreamCacheException;
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Helper for {@link org.apache.camel.StreamCache} in Camel route engine.
+ */
+final class StreamCachingHelper {
+
+ private StreamCachingHelper() {
+ }
+
+ public static StreamCache convertToStreamCache(StreamCachingStrategy strategy, Exchange exchange, Message message) {
+ // check if body is already cached
+ try {
+ Object body = message.getBody();
+ if (body == null) {
+ return null;
+ } else if (body instanceof StreamCache) {
+ StreamCache sc = (StreamCache) body;
+ // reset so the cache is ready to be used before processing
+ sc.reset();
+ return sc;
+ }
+ } catch (Exception e) {
+ // lets allow Camels error handler to deal with stream cache failures
+ StreamCacheException tce = new StreamCacheException(null, e);
+ exchange.setException(tce);
+ // because this is stream caching error then we cannot use redelivery as the message body is corrupt
+ // so mark as redelivery exhausted
+ exchange.getExchangeExtension().setRedeliveryExhausted(true);
+ }
+ // check if we somewhere failed due to a stream caching exception
+ Throwable cause = exchange.getException();
+ if (cause == null) {
+ cause = exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Throwable.class);
+ }
+ return tryStreamCache(strategy, exchange, message, cause);
+ }
+
+ private static StreamCache tryStreamCache(
+ StreamCachingStrategy strategy, Exchange exchange, Message inMessage, Throwable cause) {
+ final boolean failed = cause != null && ObjectHelper.getException(StreamCacheException.class, cause) != null;
+ if (!failed) {
+ boolean disabled = exchange.getExchangeExtension().isStreamCacheDisabled();
+ if (disabled) {
+ return null;
+ }
+ try {
+ // cache the body and if we could do that replace it as the new body
+ StreamCache sc = strategy.cache(exchange);
+ if (sc != null) {
+ inMessage.setBody(sc);
+ }
+ return sc;
+ } catch (Exception e) {
+ // lets allow Camels error handler to deal with stream cache failures
+ StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e);
+ exchange.setException(tce);
+ // because this is stream caching error then we cannot use redelivery as the message body is corrupt
+ // so mark as redelivery exhausted
+ exchange.getExchangeExtension().setRedeliveryExhausted(true);
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index 3826478429d..a7875995b02 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -215,8 +215,10 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit
* Will use the original input message body when an {@link org.apache.camel.Exchange} for this on completion.
* <p/>
* The original input message is defensively copied, and the copied message body is converted to
- * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
- * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+ * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+ * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+ * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+ * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
* able to re-read when accessed later.
* <p/>
* <b>Important:</b> The original input means the input message that are bounded by the current
@@ -231,12 +233,42 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit
* By default this feature is off.
*
* @return the builder
+ * @deprecated use {@link #useOriginalMessage()}
*/
+ @Deprecated
public OnCompletionDefinition useOriginalBody() {
setUseOriginalMessage(Boolean.toString(true));
return this;
}
+ /**
+ * Will use the original input message when an {@link org.apache.camel.Exchange} for this on completion.
+ * <p/>
+ * The original input message is defensively copied, and the copied message body is converted to
+ * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+ * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+ * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+ * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+ * able to re-read when accessed later.
+ * <p/>
+ * <b>Important:</b> The original input means the input message that are bounded by the current
+ * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they
+ * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints
+ * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as
+ * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary
+ * for the messages in their sub-route (eg the split message); however these EIPs have an option named
+ * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and
+ * therefore use the parent original message.
+ * <p/>
+ * By default this feature is off.
+ *
+ * @return the builder
+ */
+ public OnCompletionDefinition useOriginalMessage() {
+ setUseOriginalMessage(Boolean.toString(true));
+ return this;
+ }
+
/**
* To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel
* processing is automatic implied, and you do not have to enable that option as well.
@@ -346,7 +378,25 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit
/**
* Will use the original input message body when an {@link org.apache.camel.Exchange} for this on completion.
* <p/>
+ * The original input message is defensively copied, and the copied message body is converted to
+ * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+ * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+ * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+ * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+ * able to re-read when accessed later.
+ * <p/>
+ * <b>Important:</b> The original input means the input message that are bounded by the current
+ * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they
+ * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints
+ * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as
+ * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary
+ * for the messages in their sub-route (eg the split message); however these EIPs have an option named
+ * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and
+ * therefore use the parent original message.
+ * <p/>
* By default this feature is off.
+ *
+ * @return the builder
*/
public void setUseOriginalMessage(String useOriginalMessage) {
this.useOriginalMessage = useOriginalMessage;
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
index e61d92dc1a9..26db6bd68c1 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
@@ -669,8 +669,10 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio
* original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
* <p/>
* The original input message is defensively copied, and the copied message body is converted to
- * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
- * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+ * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+ * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+ * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+ * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
* able to re-read when accessed later.
* <p/>
* <b>Important:</b> The original input means the input message that are bounded by the current
@@ -711,8 +713,10 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio
* original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
* <p/>
* The original input message is defensively copied, and the copied message body is converted to
- * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
- * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+ * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+ * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+ * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+ * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
* able to re-read when accessed later.
* <p/>
* <b>Important:</b> The original input means the input message that are bounded by the current
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
index 74ab640435e..a46c3368cc2 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
@@ -199,8 +199,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition {
* original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
* <p/>
* The original input message is defensively copied, and the copied message body is converted to
- * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
- * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+ * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+ * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+ * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+ * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
* able to re-read when accessed later.
* <p/>
* <b>Important:</b> The original input means the input message that are bounded by the current
@@ -241,8 +243,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition {
* original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
* <p/>
* The original input message is defensively copied, and the copied message body is converted to
- * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
- * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+ * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+ * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+ * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+ * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
* able to re-read when accessed later.
* <p/>
* <b>Important:</b> The original input means the input message that are bounded by the current
@@ -697,8 +701,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition {
* original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
* <p/>
* The original input message is defensively copied, and the copied message body is converted to
- * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
- * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+ * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+ * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+ * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+ * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
* able to re-read when accessed later.
* <p/>
* <b>Important:</b> The original input means the input message that are bounded by the current
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java
new file mode 100644
index 00000000000..86045242b3d
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.onexception;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.StreamCache;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.support.service.ServiceSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class OnExceptionUseOriginalMessageStreamTwoTest extends ContextTestSupport {
+
+ private final List<String> list1 = new ArrayList<>();
+ private final List<String> list2 = new ArrayList<>();
+
+ @Test
+ void convertUseOriginalMessage() {
+ String data = "data";
+ InputStream is = new ByteArrayInputStream(data.getBytes());
+ template.sendBody("direct:start", is);
+
+ Assertions.assertEquals(list1.get(0), list2.get(0));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class)
+ .useOriginalMessage()
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Assertions.assertTrue(exchange.getMessage().getBody() instanceof StreamCache);
+ String s = exchange.getMessage().getBody(String.class);
+ list1.add(s);
+ }
+ })
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Assertions.assertTrue(exchange.getMessage().getBody() instanceof StreamCache);
+ String s = exchange.getMessage().getBody(String.class);
+ list2.add(s);
+ }
+ })
+ .handled(true);
+
+ from("direct:start")
+ .unmarshal(new MyDataFormat());
+ }
+ };
+ }
+
+ public static class MyDataFormatException extends Exception {
+
+ public MyDataFormatException(String message) {
+ super(message);
+ }
+ }
+
+ public class MyDataFormat extends ServiceSupport implements DataFormat {
+
+ @Override
+ public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
+ // noop
+ }
+
+ @Override
+ public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
+ // simulate reading the entire stream so its not re-readable later
+ String s = IOConverter.toString(stream, exchange);
+ throw new MyDataFormatException(s);
+ }
+ }
+}