You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/01 07:33:13 UTC

[camel] 01/03: CAMEL-19670: camel-core - useOriginalMessage should do like stream-ca… (#10934)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch sc-original-3
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 807850535aa865aefbd67975ac93b966aa8672fe
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 1 08:00:18 2023 +0200

    CAMEL-19670: camel-core - useOriginalMessage should do like stream-ca… (#10934)
    
    CAMEL-19670: camel-core - useOriginalMessage should do like stream-caching advice making defensive copy that is safe to re-read. Added unit test from Bartosz Popiela into camel-zipfile.
---
 .../zipfile/ZipSplitterUseOriginalMessageTest.java |  92 +++++++++++++++++++
 .../camel/impl/engine/CamelInternalProcessor.java  |  57 +-----------
 .../camel/impl/engine/DefaultUnitOfWork.java       |  24 ++++-
 .../camel/impl/engine/StreamCachingHelper.java     |  90 ++++++++++++++++++
 .../apache/camel/model/OnCompletionDefinition.java |  54 ++++++++++-
 .../apache/camel/model/OnExceptionDefinition.java  |  12 ++-
 .../DefaultErrorHandlerDefinition.java             |  18 ++--
 ...OnExceptionUseOriginalMessageStreamTwoTest.java | 102 +++++++++++++++++++++
 8 files changed, 380 insertions(+), 69 deletions(-)

diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java
new file mode 100644
index 00000000000..48a6662f299
--- /dev/null
+++ b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.zipfile;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.model.dataformat.ZipFileDataFormat;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class ZipSplitterUseOriginalMessageTest extends CamelTestSupport {
+
+    private List<String> list1 = new ArrayList<>();
+    private List<String> list2 = new ArrayList<>();
+
+    @Test
+    public void testSplitter() throws InterruptedException {
+        MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
+        processZipEntry.expectedBodiesReceivedInAnyOrder("chau", "hi", "hola", "another_chiau", "another_hi");
+        MockEndpoint.assertIsSatisfied(context);
+
+        // should be the same
+        Arrays.deepEquals(list1.toArray(), list2.toArray());
+    }
+
+    private org.apache.camel.model.dataformat.ZipFileDataFormat multiEntryZipFormat() {
+        var zipFormat = new ZipFileDataFormat();
+        zipFormat.setUsingIterator("true");
+        return zipFormat;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                onException(Exception.class)
+                        // turn on original message which caused CAMEL-19670
+                        .useOriginalMessage();
+
+                // Unzip file and Split it according to FileEntry
+                from("file:src/test/resources/org/apache/camel/dataformat/zipfile/data?delay=1000&noop=true")
+                        .log("Start processing big file: ${header.CamelFileName}")
+                        .unmarshal(multiEntryZipFormat())
+                        .split(bodyAs(Iterator.class)).streaming()
+                        .log("${body}")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                // should be able to read the stream
+                                String s = exchange.getMessage().getBody(String.class);
+                                list1.add(s);
+                            }
+                        })
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                // should be able to read the stream again
+                                String s = exchange.getMessage().getBody(String.class);
+                                list2.add(s);
+                            }
+                        })
+                        .to("mock:processZipEntry")
+                        .end()
+                        .log("Done processing big file: ${header.CamelFileName}");
+            }
+        };
+
+    }
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 5347504f47b..febb99ce644 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -36,7 +36,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.StatefulService;
 import org.apache.camel.StreamCache;
-import org.apache.camel.StreamCacheException;
 import org.apache.camel.impl.debugger.BacklogDebugger;
 import org.apache.camel.impl.debugger.BacklogTracer;
 import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage;
@@ -66,7 +65,6 @@ import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -918,62 +916,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
 
         @Override
         public StreamCache before(Exchange exchange) throws Exception {
-            // check if body is already cached
-            try {
-                Object body = exchange.getIn().getBody();
-                if (body == null) {
-                    return null;
-                } else if (body instanceof StreamCache) {
-                    StreamCache sc = (StreamCache) body;
-                    // reset so the cache is ready to be used before processing
-                    sc.reset();
-                    return sc;
-                }
-            } catch (Exception e) {
-                // lets allow Camels error handler to deal with stream cache failures
-                StreamCacheException tce = new StreamCacheException(null, e);
-                exchange.setException(tce);
-                // because this is stream caching error then we cannot use redelivery as the message body is corrupt
-                // so mark as redelivery exhausted
-                exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
-            }
-            // check if we somewhere failed due to a stream caching exception
-            Throwable cause = exchange.getException();
-            if (cause == null) {
-                cause = exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Throwable.class);
-            }
-            boolean failed = cause != null && ObjectHelper.getException(StreamCacheException.class, cause) != null;
-            if (!failed) {
-                boolean disabled = exchange.adapt(ExtendedExchange.class).isStreamCacheDisabled();
-                if (disabled) {
-                    return null;
-                }
-                try {
-                    // cache the body and if we could do that replace it as the new body
-                    StreamCache sc = strategy.cache(exchange);
-                    if (sc != null) {
-                        exchange.getIn().setBody(sc);
-                    }
-                    return sc;
-                } catch (Exception e) {
-                    // lets allow Camels error handler to deal with stream cache failures
-                    StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e);
-                    exchange.setException(tce);
-                    // because this is stream caching error then we cannot use redelivery as the message body is corrupt
-                    // so mark as redelivery exhausted
-                    exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
-                }
-            }
-            return null;
+            return StreamCachingHelper.convertToStreamCache(strategy, exchange, exchange.getIn());
         }
 
         @Override
         public void after(Exchange exchange, StreamCache sc) throws Exception {
-            Object body = exchange.getMessage().getBody();
-            if (body instanceof StreamCache) {
-                // reset so the cache is ready to be reused after processing
-                ((StreamCache) body).reset();
-            }
+            // reset cached streams so they can be read again
+            MessageHelper.resetStreamCache(exchange.getMessage());
         }
 
         @Override
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index e82e4e85bb8..5c9a6ffae83 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -102,6 +102,21 @@ public class DefaultUnitOfWork implements UnitOfWork {
         }
     }
 
+    private boolean isStreamCacheInUse(Exchange exchange) {
+        boolean inUse = streamCachingStrategy.isEnabled();
+        if (inUse) {
+            // the original route (from route) may have disabled stream caching
+            String rid = exchange.getFromRouteId();
+            if (rid != null) {
+                Route route = exchange.getContext().getRoute(rid);
+                if (route != null) {
+                    inUse = route.isStreamCaching() != null && route.isStreamCaching();
+                }
+            }
+        }
+        return inUse;
+    }
+
     private void doOnPrepare(Exchange exchange) {
         // unit of work is reused, so setup for this exchange
         this.exchange = exchange;
@@ -119,11 +134,14 @@ public class DefaultUnitOfWork implements UnitOfWork {
             if (this.originalInMessage instanceof MessageSupport) {
                 ((MessageSupport) this.originalInMessage).setExchange(exchange);
             }
-            if (streamCachingStrategy.isEnabled()) {
-                // if the input body is streaming we need to cache it, so we can access the original input message
-                StreamCache cache = context.getStreamCachingStrategy().cache(this.originalInMessage);
+            if (streamCachingStrategy.isEnabled() && isStreamCacheInUse(exchange)) {
+                // if the input body is streaming we need to cache it, so we can access the original input message (like stream caching advice does)
+                StreamCache cache
+                        = StreamCachingHelper.convertToStreamCache(streamCachingStrategy, exchange, this.originalInMessage);
                 if (cache != null) {
                     this.originalInMessage.setBody(cache);
+                    // replace original incoming message with stream cache
+                    this.exchange.getIn().setBody(cache);
                 }
             }
         }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
new file mode 100644
index 00000000000..63fba73c035
--- /dev/null
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.Message;
+import org.apache.camel.StreamCache;
+import org.apache.camel.StreamCacheException;
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Helper for {@link org.apache.camel.StreamCache} in Camel route engine.
+ */
+final class StreamCachingHelper {
+
+    private StreamCachingHelper() {
+    }
+
+    public static StreamCache convertToStreamCache(StreamCachingStrategy strategy, Exchange exchange, Message message) {
+        // check if body is already cached
+        try {
+            Object body = message.getBody();
+            if (body == null) {
+                return null;
+            } else if (body instanceof StreamCache) {
+                StreamCache sc = (StreamCache) body;
+                // reset so the cache is ready to be used before processing
+                sc.reset();
+                return sc;
+            }
+        } catch (Exception e) {
+            // lets allow Camels error handler to deal with stream cache failures
+            StreamCacheException tce = new StreamCacheException(null, e);
+            exchange.setException(tce);
+            // because this is stream caching error then we cannot use redelivery as the message body is corrupt
+            // so mark as redelivery exhausted
+            exchange.getExchangeExtension().setRedeliveryExhausted(true);
+        }
+        // check if we somewhere failed due to a stream caching exception
+        Throwable cause = exchange.getException();
+        if (cause == null) {
+            cause = exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Throwable.class);
+        }
+        return tryStreamCache(strategy, exchange, message, cause);
+    }
+
+    private static StreamCache tryStreamCache(
+            StreamCachingStrategy strategy, Exchange exchange, Message inMessage, Throwable cause) {
+        final boolean failed = cause != null && ObjectHelper.getException(StreamCacheException.class, cause) != null;
+        if (!failed) {
+            boolean disabled = exchange.getExchangeExtension().isStreamCacheDisabled();
+            if (disabled) {
+                return null;
+            }
+            try {
+                // cache the body and if we could do that replace it as the new body
+                StreamCache sc = strategy.cache(exchange);
+                if (sc != null) {
+                    inMessage.setBody(sc);
+                }
+                return sc;
+            } catch (Exception e) {
+                // lets allow Camels error handler to deal with stream cache failures
+                StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e);
+                exchange.setException(tce);
+                // because this is stream caching error then we cannot use redelivery as the message body is corrupt
+                // so mark as redelivery exhausted
+                exchange.getExchangeExtension().setRedeliveryExhausted(true);
+            }
+        }
+        return null;
+    }
+
+}
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index 3826478429d..a7875995b02 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -215,8 +215,10 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit
      * Will use the original input message body when an {@link org.apache.camel.Exchange} for this on completion.
      * <p/>
      * The original input message is defensively copied, and the copied message body is converted to
-     * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
-     * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+     * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+     * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+     * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+     * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
      * able to re-read when accessed later.
      * <p/>
      * <b>Important:</b> The original input means the input message that are bounded by the current
@@ -231,12 +233,42 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit
      * By default this feature is off.
      *
      * @return the builder
+     * @deprecated use {@link #useOriginalMessage()}
      */
+    @Deprecated
     public OnCompletionDefinition useOriginalBody() {
         setUseOriginalMessage(Boolean.toString(true));
         return this;
     }
 
+    /**
+     * Will use the original input message when an {@link org.apache.camel.Exchange} for this on completion.
+     * <p/>
+     * The original input message is defensively copied, and the copied message body is converted to
+     * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+     * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+     * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+     * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+     * able to re-read when accessed later.
+     * <p/>
+     * <b>Important:</b> The original input means the input message that are bounded by the current
+     * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they
+     * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints
+     * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as
+     * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary
+     * for the messages in their sub-route (eg the split message); however these EIPs have an option named
+     * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and
+     * therefore use the parent original message.
+     * <p/>
+     * By default this feature is off.
+     *
+     * @return the builder
+     */
+    public OnCompletionDefinition useOriginalMessage() {
+        setUseOriginalMessage(Boolean.toString(true));
+        return this;
+    }
+
     /**
      * To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel
      * processing is automatic implied, and you do not have to enable that option as well.
@@ -346,7 +378,25 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit
     /**
      * Will use the original input message body when an {@link org.apache.camel.Exchange} for this on completion.
      * <p/>
+     * The original input message is defensively copied, and the copied message body is converted to
+     * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+     * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+     * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+     * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+     * able to re-read when accessed later.
+     * <p/>
+     * <b>Important:</b> The original input means the input message that are bounded by the current
+     * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they
+     * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints
+     * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as
+     * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary
+     * for the messages in their sub-route (eg the split message); however these EIPs have an option named
+     * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and
+     * therefore use the parent original message.
+     * <p/>
      * By default this feature is off.
+     *
+     * @return the builder
      */
     public void setUseOriginalMessage(String useOriginalMessage) {
         this.useOriginalMessage = useOriginalMessage;
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
index e61d92dc1a9..26db6bd68c1 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
@@ -669,8 +669,10 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio
      * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
      * <p/>
      * The original input message is defensively copied, and the copied message body is converted to
-     * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
-     * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+     * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+     * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+     * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+     * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
      * able to re-read when accessed later.
      * <p/>
      * <b>Important:</b> The original input means the input message that are bounded by the current
@@ -711,8 +713,10 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio
      * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
      * <p/>
      * The original input message is defensively copied, and the copied message body is converted to
-     * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
-     * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+     * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+     * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+     * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+     * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
      * able to re-read when accessed later.
      * <p/>
      * <b>Important:</b> The original input means the input message that are bounded by the current
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
index 74ab640435e..a46c3368cc2 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
@@ -199,8 +199,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition {
      * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
      * <p/>
      * The original input message is defensively copied, and the copied message body is converted to
-     * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
-     * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+     * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+     * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+     * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+     * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
      * able to re-read when accessed later.
      * <p/>
      * <b>Important:</b> The original input means the input message that are bounded by the current
@@ -241,8 +243,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition {
      * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
      * <p/>
      * The original input message is defensively copied, and the copied message body is converted to
-     * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
-     * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+     * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+     * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+     * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+     * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
      * able to re-read when accessed later.
      * <p/>
      * <b>Important:</b> The original input means the input message that are bounded by the current
@@ -697,8 +701,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition {
      * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
      * <p/>
      * The original input message is defensively copied, and the copied message body is converted to
-     * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is
-     * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
+     * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route),
+     * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache}
+     * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body.
+     * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be
      * able to re-read when accessed later.
      * <p/>
      * <b>Important:</b> The original input means the input message that are bounded by the current
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java
new file mode 100644
index 00000000000..86045242b3d
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.onexception;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.StreamCache;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.support.service.ServiceSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class OnExceptionUseOriginalMessageStreamTwoTest extends ContextTestSupport {
+
+    private final List<String> list1 = new ArrayList<>();
+    private final List<String> list2 = new ArrayList<>();
+
+    @Test
+    void convertUseOriginalMessage() {
+        String data = "data";
+        InputStream is = new ByteArrayInputStream(data.getBytes());
+        template.sendBody("direct:start", is);
+
+        Assertions.assertEquals(list1.get(0), list2.get(0));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                        .useOriginalMessage()
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                Assertions.assertTrue(exchange.getMessage().getBody() instanceof StreamCache);
+                                String s = exchange.getMessage().getBody(String.class);
+                                list1.add(s);
+                            }
+                        })
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                Assertions.assertTrue(exchange.getMessage().getBody() instanceof StreamCache);
+                                String s = exchange.getMessage().getBody(String.class);
+                                list2.add(s);
+                            }
+                        })
+                        .handled(true);
+
+                from("direct:start")
+                        .unmarshal(new MyDataFormat());
+            }
+        };
+    }
+
+    public static class MyDataFormatException extends Exception {
+
+        public MyDataFormatException(String message) {
+            super(message);
+        }
+    }
+
+    public class MyDataFormat extends ServiceSupport implements DataFormat {
+
+        @Override
+        public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
+            // noop
+        }
+
+        @Override
+        public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
+            // simulate reading the entire stream so its not re-readable later
+            String s = IOConverter.toString(stream, exchange);
+            throw new MyDataFormatException(s);
+        }
+    }
+}