You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/03/23 12:32:20 UTC

(camel) branch var-fail created (now 87d35c17db6)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch var-fail
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 87d35c17db6 CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully

This branch includes the following new commits:

     new 951c77f1364 CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
     new c170879018b CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
     new 055632ddc17 CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
     new b54b3ba73fe CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
     new 87d35c17db6 CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(camel) 01/05: CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var-fail
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 951c77f1364285230abd1ba36413f345906b48f6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Mar 23 09:21:21 2024 +0100

    CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
---
 .../java/org/apache/camel/processor/Enricher.java  |  2 +-
 .../org/apache/camel/processor/PollEnricher.java   |  2 +-
 .../camel/processor/SendDynamicProcessor.java      |  2 +-
 .../org/apache/camel/processor/SendProcessor.java  |  4 ++--
 .../org/apache/camel/support/ExchangeHelper.java   | 22 ++++++++++++++++++++++
 5 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index 5dcabc96fac..97d757a5f99 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -222,7 +222,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
 
                         Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
                         if (aggregatedExchange != null) {
-                            if (variableReceive != null) {
+                            if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
                                 // result should be stored in variable instead of message body
                                 ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
                                         exchange.getMessage());
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index d939963dd26..f6cac002f3d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -350,7 +350,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
                 // must catch any exception from aggregation
                 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
                 if (aggregatedExchange != null) {
-                    if (variableReceive != null) {
+                    if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
                         // result should be stored in variable instead of message body
                         ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage());
                         exchange.getMessage().setBody(originalBody);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index d877f180684..94c41c1b107 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -237,7 +237,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
                     ServiceHelper.stopAndShutdownService(endpoint);
                 }
                 // result should be stored in variable instead of message body
-                if (variableReceive != null) {
+                if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
                     ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage());
                     exchange.getMessage().setBody(originalBody);
                     exchange.getMessage().setHeaders(originalHeaders);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index 04840285980..0f0bde0a265 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -179,7 +179,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
                 ac = doneSync -> {
                     try {
                         // result should be stored in variable instead of message body/headers
-                        if (variableReceive != null) {
+                        if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
                             ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
                                     exchange.getMessage());
                             exchange.getMessage().setBody(originalBody);
@@ -240,7 +240,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
                         // restore previous MEP
                         exchange.setPattern(existingPattern);
                         // result should be stored in variable instead of message body/headers
-                        if (variableReceive != null) {
+                        if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
                             ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
                                     exchange.getMessage());
                             exchange.getMessage().setBody(originalBody);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 2cdf7071450..faa8470376a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -1156,6 +1156,28 @@ public final class ExchangeHelper {
         }
     }
 
+    /**
+     * Whether the processing of the {@link Exchange} was success and that the result should be stored in variable.
+     *
+     * @param  exchange the exchange
+     * @param  name     the variable name
+     * @return          true to call setVariableFromMessageBodyAndHeaders to set the result after-wards
+     */
+    public static boolean shouldSetVariableResult(Exchange exchange, String name) {
+        if (name == null) {
+            return false;
+        }
+        // same logic as in Pipeline/PipelineHelper
+        boolean stop = exchange.isFailed() || exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()
+                || exchange.getExchangeExtension().isErrorHandlerHandledSet()
+                        && exchange.getExchangeExtension().isErrorHandlerHandled();
+        if (stop) {
+            return false;
+        }
+        // success
+        return true;
+    }
+
     /**
      * Gets the variable
      *


(camel) 03/05: CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var-fail
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 055632ddc171ad9fcf3f8fe9a128bbdfd9bf22c2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Mar 23 09:57:53 2024 +0100

    CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
---
 .../camel/processor/ToVariableErrorTest.java       | 291 +++++++++++++++++++++
 .../org/apache/camel/support/ExchangeHelper.java   |   7 +-
 2 files changed, 295 insertions(+), 3 deletions(-)

diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableErrorTest.java
new file mode 100644
index 00000000000..f3351a2941d
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableErrorTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ToVariableErrorTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testThrowException() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        // TODO: should this be World or Bye World?
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testTryCatch() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .doTry()
+                            .throwException(new IllegalArgumentException("Forced"))
+                        .doCatch(Exception.class)
+                            .setBody(simple("Catch: ${body}"))
+                        .end();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertTrue(out.hasVariables());
+        Assertions.assertEquals("World", out.getMessage().getBody());
+        Assertions.assertEquals("Catch: Bye World", out.getVariable("bye"));
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnExceptionHandled() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                        .handled(true)
+                        .setBody(simple("Error: ${body}"));
+
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Error: Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnExceptionNotHandled() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                        .handled(false)
+                        .setBody(simple("Error: ${body}"));
+
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Error: Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testDeadLetterChannel() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testDefaultErrorHandler() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(defaultErrorHandler());
+
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testStop() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .stop();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testRollback() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .rollback();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testMarkRollbackLast() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .markRollbackOnly();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testMarkRollbackOnlyLast() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .toV("direct:foo", null, "bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .markRollbackOnlyLast();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index faa8470376a..a73db916db9 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -1168,9 +1168,10 @@ public final class ExchangeHelper {
             return false;
         }
         // same logic as in Pipeline/PipelineHelper
-        boolean stop = exchange.isFailed() || exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()
-                || exchange.getExchangeExtension().isErrorHandlerHandledSet()
-                        && exchange.getExchangeExtension().isErrorHandlerHandled();
+        boolean stop
+                = exchange.isRouteStop() || exchange.isFailed() || exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()
+                        || exchange.getExchangeExtension().isErrorHandlerHandledSet()
+                                && exchange.getExchangeExtension().isErrorHandlerHandled();
         if (stop) {
             return false;
         }


(camel) 02/05: CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var-fail
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c170879018b2eba9c6eb88ff3692fa799fd59e9d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Mar 23 09:25:22 2024 +0100

    CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
---
 .../modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc   | 16 ++++++++++++++++
 .../modules/ROOT/pages/camel-4x-upgrade-guide.adoc       |  1 +
 2 files changed, 17 insertions(+)

diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc
new file mode 100644
index 00000000000..d297df9c99e
--- /dev/null
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc
@@ -0,0 +1,16 @@
+= Apache Camel 4.x Upgrade Guide
+
+This document is for helping you upgrade your Apache Camel application
+from Camel 4.x to 4.y. For example, if you are upgrading Camel 4.0 to 4.2, then you should follow the guides
+from both 4.0 to 4.1 and 4.1 to 4.2.
+
+== Upgrading Camel 4.5 to 4.6
+
+=== variables
+
+When using `variableReceive` then the variable is only set if processing the `Exchange` was completely successfully.
+
+For example calling a route that fails due to an exception being thrown (even if `onException` or `errorHandler` are in use)
+then the variable is no longer set. Also, if the route is marked for rollback, or to stop continue routing with `.stop()`.
+
+This is the same logic that the routing engine uses, whether to continue routing the `Exchange` or not.
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc
index d82ec3704e4..903ec6b126e 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc
@@ -14,4 +14,5 @@ You can find upgrade guide for each release in the following pages:
 - xref:camel-4x-upgrade-guide-4_3.adoc[Upgrade guide 4.2 -> 4.3]
 - xref:camel-4x-upgrade-guide-4_4.adoc[Upgrade guide 4.3 -> 4.4]
 - xref:camel-4x-upgrade-guide-4_5.adoc[Upgrade guide 4.4 -> 4.5]
+- xref:camel-4x-upgrade-guide-4_5.adoc[Upgrade guide 4.5 -> 4.6]
 


(camel) 05/05: CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var-fail
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 87d35c17db647fc97e9e2d6b9637eaa3741c3c6f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Mar 23 13:26:05 2024 +0100

    CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
---
 .../java/org/apache/camel/processor/Enricher.java  |  10 +-
 .../org/apache/camel/processor/PollEnricher.java   |   9 +-
 .../camel/processor/EnrichVariableErrorTest.java   | 291 +++++++++++++++++++++
 .../processor/PollEnrichVariableErrorTest.java     | 112 ++++++++
 4 files changed, 413 insertions(+), 9 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index 97d757a5f99..9424fe89a61 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -222,12 +222,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
 
                         Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
                         if (aggregatedExchange != null) {
-                            if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
+                            if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) {
                                 // result should be stored in variable instead of message body
-                                ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
-                                        exchange.getMessage());
-                                exchange.getMessage().setBody(originalBody);
-                                exchange.getMessage().setHeaders(originalHeaders);
+                                ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive,
+                                        aggregatedExchange.getMessage());
+                                aggregatedExchange.getMessage().setBody(originalBody);
+                                aggregatedExchange.getMessage().setHeaders(originalHeaders);
                             }
                             // copy aggregation result onto original exchange (preserving pattern)
                             copyResultsWithoutCorrelationId(exchange, aggregatedExchange);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index f6cac002f3d..e13ec57ed88 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -350,11 +350,12 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
                 // must catch any exception from aggregation
                 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
                 if (aggregatedExchange != null) {
-                    if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
+                    if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) {
                         // result should be stored in variable instead of message body
-                        ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage());
-                        exchange.getMessage().setBody(originalBody);
-                        exchange.getMessage().setHeaders(originalHeaders);
+                        ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive,
+                                aggregatedExchange.getMessage());
+                        aggregatedExchange.getMessage().setBody(originalBody);
+                        aggregatedExchange.getMessage().setHeaders(originalHeaders);
                     }
                     // copy aggregation result onto original exchange (preserving pattern)
                     copyResultsPreservePattern(exchange, aggregatedExchange);
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java
new file mode 100644
index 00000000000..3c199057eae
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class EnrichVariableErrorTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testThrowException() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        // TODO: should this be World or Bye World?
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testTryCatch() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .doTry()
+                            .throwException(new IllegalArgumentException("Forced"))
+                        .doCatch(Exception.class)
+                            .setBody(simple("Catch: ${body}"))
+                        .end();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertTrue(out.hasVariables());
+        Assertions.assertEquals("World", out.getMessage().getBody());
+        Assertions.assertEquals("Catch: Bye World", out.getVariable("bye"));
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnExceptionHandled() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                        .handled(true)
+                        .setBody(simple("Error: ${body}"));
+
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Error: Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnExceptionNotHandled() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                        .handled(false)
+                        .setBody(simple("Error: ${body}"));
+
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Error: Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testDeadLetterChannel() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testDefaultErrorHandler() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(defaultErrorHandler());
+
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testStop() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .stop();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testRollback() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .rollback();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testMarkRollbackLast() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .markRollbackOnly();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testMarkRollbackOnlyLast() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .transform().simple("Bye ${body}")
+                        .markRollbackOnlyLast();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java
new file mode 100644
index 00000000000..c1dd22ff715
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.seda.SedaEndpoint;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PollEnrichVariableErrorTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testThrowException() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        template.send("seda:foo", e -> {
+            e.getMessage().setBody("Bye World");
+            e.setException(new IllegalArgumentException());
+        });
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertTrue(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testStop() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        SedaEndpoint se = context.getEndpoint("seda:foo", SedaEndpoint.class);
+        Exchange ex = se.createExchange();
+        ex.getMessage().setBody("Bye World");
+        ex.setRouteStop(true);
+        se.getQueue().add(ex);
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertTrue(out.isRouteStop());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testRollbackOnly() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        SedaEndpoint se = context.getEndpoint("seda:foo", SedaEndpoint.class);
+        Exchange ex = se.createExchange();
+        ex.getMessage().setBody("Bye World");
+        ex.setRollbackOnly(true);
+        se.getQueue().add(ex);
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
+        Assertions.assertFalse(out.isFailed());
+        Assertions.assertFalse(out.hasVariables());
+        Assertions.assertTrue(out.isRollbackOnly());
+        Assertions.assertEquals("Bye World", out.getMessage().getBody());
+        assertMockEndpointsSatisfied();
+    }
+
+}


(camel) 04/05: CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var-fail
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b54b3ba73feb4d4cff183dcd4ed048788e7116d5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Mar 23 10:26:55 2024 +0100

    CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
---
 .../org/apache/camel/processor/SendDynamicProcessor.java     |  8 ++++----
 .../main/java/org/apache/camel/processor/SendProcessor.java  | 12 +++++-------
 2 files changed, 9 insertions(+), 11 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 94c41c1b107..decf336be57 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -237,10 +237,10 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
                     ServiceHelper.stopAndShutdownService(endpoint);
                 }
                 // result should be stored in variable instead of message body
-                if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
-                    ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage());
-                    exchange.getMessage().setBody(originalBody);
-                    exchange.getMessage().setHeaders(originalHeaders);
+                if (ExchangeHelper.shouldSetVariableResult(target, variableReceive)) {
+                    ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive, target.getMessage());
+                    target.getMessage().setBody(originalBody);
+                    target.getMessage().setHeaders(originalHeaders);
                 }
                 // signal we are done
                 c.done(doneSync);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index 0f0bde0a265..37dd575fccd 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -179,11 +179,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
                 ac = doneSync -> {
                     try {
                         // result should be stored in variable instead of message body/headers
-                        if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
-                            ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
-                                    exchange.getMessage());
-                            exchange.getMessage().setBody(originalBody);
-                            exchange.getMessage().setHeaders(originalHeaders);
+                        if (ExchangeHelper.shouldSetVariableResult(target, variableReceive)) {
+                            ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive,
+                                    target.getMessage());
+                            target.getMessage().setBody(originalBody);
+                            target.getMessage().setHeaders(originalHeaders);
                         }
                         // restore previous MEP
                         target.setPattern(existingPattern);
@@ -202,8 +202,6 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
                 if (variableSend != null) {
                     Object value = ExchangeHelper.getVariable(exchange, variableSend);
                     exchange.getMessage().setBody(value);
-                    // TODO: empty headers or
-
                 }
 
                 LOG.debug(">>>> {} {}", destination, exchange);