You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zh...@apache.org on 2022/03/01 16:09:18 UTC
[camel] branch camel-3.14.x updated: CAMEL-17474: camel-core - set transacted when copying exchange in mul… (#7040)
This is an automated email from the ASF dual-hosted git repository.
zhfeng pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.14.x by this push:
new f30ef47 CAMEL-17474: camel-core - set transacted when copying exchange in mul… (#7040)
f30ef47 is described below
commit f30ef47f0c9bf84a8d7d464033b6cbb16b1fbdb8
Author: Amos Feng <zh...@gmail.com>
AuthorDate: Tue Mar 1 23:54:39 2022 +0800
CAMEL-17474: camel-core - set transacted when copying exchange in mul… (#7040)
* CAMEL-17474: camel-core - set transacted when copying exchange in multicast processor
* CAMEL-17474: camel-core - similar fix with RecipientList and Split EIP
* CAMEL-17474: add a test for Enrich EIP
* CAMEL-17474 add assertions to check the copied exchanges are marked as transactional
---
.../apache/camel/processor/MulticastProcessor.java | 1 +
.../camel/processor/RecipientListProcessor.java | 2 +
.../java/org/apache/camel/processor/Splitter.java | 2 +
.../org/apache/camel/itest/tx/JtaRouteTest.java | 129 +++++++++++++++++++--
4 files changed, 125 insertions(+), 9 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 4c9dcce..8b093f6 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -922,6 +922,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
for (Processor processor : processors) {
// copy exchange, and do not share the unit of work
Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+ copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
if (streamCache != null) {
if (index > 0) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 1c472d4..517fb97 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -32,6 +32,7 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -288,6 +289,7 @@ public class RecipientListProcessor extends MulticastProcessor {
Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
// copy exchange, and do not share the unit of work
Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+ copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
// if we share unit of work, we need to prepare the child exchange
if (isShareUnitOfWork()) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index b822ec8..360579e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -32,6 +32,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Route;
@@ -251,6 +252,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
// create a correlated copy as the new exchange to be routed in the splitter from the copy
// and do not share the unit of work
Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false);
+ newExchange.adapt(ExtendedExchange.class).setTransacted(original.isTransacted());
// If the splitter has an aggregation strategy
// then the StreamCache created by the child routes must not be
// closed by the unit of work of the child route, but by the unit of
diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
index 2c95a0b..4c2cc7f 100644
--- a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
+++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
@@ -16,20 +16,50 @@
*/
package org.apache.camel.itest.tx;
+import java.util.Arrays;
+
+import org.apache.camel.AggregationStrategy;
import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.cdi.transaction.RequiresNewJtaTransactionPolicy;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class JtaRouteTest extends CamelTestSupport {
@EndpointInject("mock:splitted")
private MockEndpoint splitted;
- @EndpointInject("direct:requires_new")
- private ProducerTemplate start;
+ @EndpointInject("mock:test")
+ private MockEndpoint test;
+
+ @EndpointInject("mock:a")
+ private MockEndpoint a;
+
+ @EndpointInject("mock:b")
+ private MockEndpoint b;
+
+ @EndpointInject("mock:c")
+ private MockEndpoint c;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @EndpointInject("direct:split_test")
+ private ProducerTemplate split;
+
+ @EndpointInject("direct:multicast_test")
+ private ProducerTemplate multicast;
+
+ @EndpointInject("direct:recipient_test")
+ private ProducerTemplate recipient;
+
+ @EndpointInject("direct:enrich_test")
+ private ProducerTemplate enrich;
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -38,23 +68,104 @@ public class JtaRouteTest extends CamelTestSupport {
public void configure() throws Exception {
context.getRegistry().bind("PROPAGATION_REQUIRES_NEW", new RequiresNewJtaTransactionPolicy());
- from("direct:requires_new")
+ from("direct:split_test")
.transacted("PROPAGATION_REQUIRES_NEW")
- .split(body()).delimiter("_").to("direct:splitted").end()
- .log("after splitter log which you will never see...")
- .transform().constant("requires_new");
+ .to("direct:split");
+
+ from("direct:split")
+ .split(body()).delimiter("_").to("mock:splitted").end()
+ .log("after splitter log which you will never see...");
+
+ from("direct:multicast_test").routeId("r.route1")
+ .log(LoggingLevel.DEBUG, "Entering route: ${routeId}")
+ .transacted()
+ .to("direct:multicast")
+ .log("will never get here");
+
+ from("direct:multicast").routeId("r.route2")
+ .log(LoggingLevel.DEBUG, "Entering route: ${routeId}")
+ .multicast()
+ .to("log:r.test", "direct:route3", "mock:test")
+ .end();
+
+ from("direct:route3").routeId("r.route3")
+ .process(e -> Assertions.assertTrue(e.isTransacted()))
+ .log(LoggingLevel.DEBUG, "Entering route: ${routeId}");
+
+ from("direct:recipient_test")
+ .transacted()
+ .to("direct:recipient");
+
+ from("direct:recipient")
+ .recipientList(constant("mock:a", "mock:b", "mock:c"));
+
+ from("direct:enrich_test")
+ .transacted()
+ .to("direct:enrich");
+
+ from("direct:enrich")
+ .enrich("direct:content", new AggregationStrategy() {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (newExchange == null) {
+ return oldExchange;
+ }
+ Object oldBody = oldExchange.getIn().getBody();
+ Object newBody = newExchange.getIn().getBody();
+ oldExchange.getIn().setBody(oldBody + " " + newBody);
+ return oldExchange;
+ }
+ })
+ .to("mock:result");
+
+ from("direct:content").transform().constant("Enrich");
- from("direct:splitted").to("mock:splitted");
}
};
}
@Test
void testTransactedSplit() throws Exception {
+ splitted.setExpectedMessageCount(2);
splitted.expectedBodiesReceived("requires", "new");
+ splitted.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
- start.sendBody("requires_new");
-
+ split.sendBody("requires_new");
splitted.assertIsSatisfied();
}
+
+ @Test
+ public void testTransactedMultiCast() throws Exception {
+ test.setExpectedMessageCount(1);
+ test.expectedBodiesReceived("multicast");
+ splitted.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
+
+ multicast.sendBody("multicast");
+ test.assertIsSatisfied();
+ }
+
+ @Test
+ public void testTransactedRecipient() throws Exception {
+ Arrays.asList(a, b, c).forEach(m -> {
+ m.setExpectedMessageCount(1);
+ m.expectedBodiesReceived("recipient");
+ m.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
+ });
+
+ recipient.sendBody("recipient");
+
+ for (MockEndpoint m : Arrays.asList(a, b, c)) {
+ m.assertIsSatisfied();
+ }
+ }
+
+ @Test
+ public void testTransactedEnrich() throws Exception {
+ result.setExpectedMessageCount(1);
+ result.expectedBodiesReceived("Message Enrich");
+ result.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
+
+ enrich.sendBody("Message");
+ result.assertIsSatisfied();
+ }
}