You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zh...@apache.org on 2022/03/01 15:55:16 UTC

[camel] branch main updated: CAMEL-17474: camel-core - set transacted when copying exchange in mul… (#7040)

This is an automated email from the ASF dual-hosted git repository.

zhfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 991f9ca  CAMEL-17474: camel-core - set transacted when copying exchange in mul… (#7040)
991f9ca is described below

commit 991f9ca7898bfea362821ef115d930fece317230
Author: Amos Feng <zh...@gmail.com>
AuthorDate: Tue Mar 1 23:54:39 2022 +0800

    CAMEL-17474: camel-core - set transacted when copying exchange in mul… (#7040)
    
    * CAMEL-17474: camel-core - set transacted when copying exchange in multicast processor
    
    * CAMEL-17474: camel-core - similar fix with RecipientList and Split EIP
    
    * CAMEL-17474: add a test for Enrich EIP
    
    * CAMEL-17474 add assertions to check the copied exchanges are marked as transactional
---
 .../apache/camel/processor/MulticastProcessor.java |   1 +
 .../camel/processor/RecipientListProcessor.java    |   2 +
 .../java/org/apache/camel/processor/Splitter.java  |   2 +
 .../org/apache/camel/itest/tx/JtaRouteTest.java    | 129 +++++++++++++++++++--
 4 files changed, 125 insertions(+), 9 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 6da1142..fb231ab 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -910,6 +910,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         for (Processor processor : processors) {
             // copy exchange, and do not share the unit of work
             Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+            copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
 
             if (streamCache != null) {
                 if (index > 0) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 3a0e943..7dbd374 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -32,6 +32,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -287,6 +288,7 @@ public class RecipientListProcessor extends MulticastProcessor {
             Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
         // copy exchange, and do not share the unit of work
         Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+        copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
 
         // if we share unit of work, we need to prepare the child exchange
         if (isShareUnitOfWork()) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 5383bc9..4b9b7ad 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -32,6 +32,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
@@ -227,6 +228,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
                         // create a correlated copy as the new exchange to be routed in the splitter from the copy
                         // and do not share the unit of work
                         Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false);
+                        newExchange.adapt(ExtendedExchange.class).setTransacted(original.isTransacted());
                         // If the splitter has an aggregation strategy
                         // then the StreamCache created by the child routes must not be
                         // closed by the unit of work of the child route, but by the unit of
diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
index 2c95a0b..4c2cc7f 100644
--- a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
+++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
@@ -16,20 +16,50 @@
  */
 package org.apache.camel.itest.tx;
 
+import java.util.Arrays;
+
+import org.apache.camel.AggregationStrategy;
 import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.cdi.transaction.RequiresNewJtaTransactionPolicy;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class JtaRouteTest extends CamelTestSupport {
     @EndpointInject("mock:splitted")
     private MockEndpoint splitted;
 
-    @EndpointInject("direct:requires_new")
-    private ProducerTemplate start;
+    @EndpointInject("mock:test")
+    private MockEndpoint test;
+
+    @EndpointInject("mock:a")
+    private MockEndpoint a;
+
+    @EndpointInject("mock:b")
+    private MockEndpoint b;
+
+    @EndpointInject("mock:c")
+    private MockEndpoint c;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @EndpointInject("direct:split_test")
+    private ProducerTemplate split;
+
+    @EndpointInject("direct:multicast_test")
+    private ProducerTemplate multicast;
+
+    @EndpointInject("direct:recipient_test")
+    private ProducerTemplate recipient;
+
+    @EndpointInject("direct:enrich_test")
+    private ProducerTemplate enrich;
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -38,23 +68,104 @@ public class JtaRouteTest extends CamelTestSupport {
             public void configure() throws Exception {
                 context.getRegistry().bind("PROPAGATION_REQUIRES_NEW", new RequiresNewJtaTransactionPolicy());
 
-                from("direct:requires_new")
+                from("direct:split_test")
                         .transacted("PROPAGATION_REQUIRES_NEW")
-                        .split(body()).delimiter("_").to("direct:splitted").end()
-                        .log("after splitter log which you will never see...")
-                        .transform().constant("requires_new");
+                        .to("direct:split");
+
+                from("direct:split")
+                        .split(body()).delimiter("_").to("mock:splitted").end()
+                        .log("after splitter log which you will never see...");
+
+                from("direct:multicast_test").routeId("r.route1")
+                        .log(LoggingLevel.DEBUG, "Entering route: ${routeId}")
+                        .transacted()
+                        .to("direct:multicast")
+                        .log("will never get here");
+
+                from("direct:multicast").routeId("r.route2")
+                        .log(LoggingLevel.DEBUG, "Entering route: ${routeId}")
+                        .multicast()
+                        .to("log:r.test", "direct:route3", "mock:test")
+                        .end();
+
+                from("direct:route3").routeId("r.route3")
+                        .process(e -> Assertions.assertTrue(e.isTransacted()))
+                        .log(LoggingLevel.DEBUG, "Entering route: ${routeId}");
+
+                from("direct:recipient_test")
+                        .transacted()
+                        .to("direct:recipient");
+
+                from("direct:recipient")
+                        .recipientList(constant("mock:a", "mock:b", "mock:c"));
+
+                from("direct:enrich_test")
+                        .transacted()
+                        .to("direct:enrich");
+
+                from("direct:enrich")
+                        .enrich("direct:content", new AggregationStrategy() {
+                            @Override
+                            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                                if (newExchange == null) {
+                                    return oldExchange;
+                                }
+                                Object oldBody = oldExchange.getIn().getBody();
+                                Object newBody = newExchange.getIn().getBody();
+                                oldExchange.getIn().setBody(oldBody + " " + newBody);
+                                return oldExchange;
+                            }
+                        })
+                        .to("mock:result");
+
+                from("direct:content").transform().constant("Enrich");
 
-                from("direct:splitted").to("mock:splitted");
             }
         };
     }
 
     @Test
     void testTransactedSplit() throws Exception {
+        splitted.setExpectedMessageCount(2);
         splitted.expectedBodiesReceived("requires", "new");
+        splitted.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
 
-        start.sendBody("requires_new");
-
+        split.sendBody("requires_new");
         splitted.assertIsSatisfied();
     }
+
+    @Test
+    public void testTransactedMultiCast() throws Exception {
+        test.setExpectedMessageCount(1);
+        test.expectedBodiesReceived("multicast");
+        splitted.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
+
+        multicast.sendBody("multicast");
+        test.assertIsSatisfied();
+    }
+
+    @Test
+    public void testTransactedRecipient() throws Exception {
+        Arrays.asList(a, b, c).forEach(m -> {
+            m.setExpectedMessageCount(1);
+            m.expectedBodiesReceived("recipient");
+            m.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
+        });
+
+        recipient.sendBody("recipient");
+
+        for (MockEndpoint m : Arrays.asList(a, b, c)) {
+            m.assertIsSatisfied();
+        }
+    }
+
+    @Test
+    public void testTransactedEnrich() throws Exception {
+        result.setExpectedMessageCount(1);
+        result.expectedBodiesReceived("Message Enrich");
+        result.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
+
+        enrich.sendBody("Message");
+        result.assertIsSatisfied();
+    }
 }