You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zh...@apache.org on 2022/08/17 01:46:28 UTC
[camel] branch camel-3.18.x updated: CAMEL-18377: camel-jpa - resue an EntityManager from the current tran… (#8141)
This is an automated email from the ASF dual-hosted git repository.
zhfeng pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push:
new c18f49b9589 CAMEL-18377: camel-jpa - resue an EntityManager from the current tran… (#8141)
c18f49b9589 is described below
commit c18f49b9589a451ce7013c9a5bc2cbac5a8a9b61
Author: Zheng Feng <zh...@gmail.com>
AuthorDate: Tue Aug 16 14:03:49 2022 +0800
CAMEL-18377: camel-jpa - resue an EntityManager from the current tran… (#8141)
* CAMEL-18377: camel-jpa - reuse an EntityManager in the current transaction if possible
* Fix to create txData only if the exchange is transacted
* Add a unit test with pooling dataSource for transactd() combined with split()
* Fix in multicast
* Fix in RecipientList
* Add a Enrich test
* Fix CS
---
components/camel-jpa/pom.xml | 5 ++
.../org/apache/camel/component/jpa/JpaHelper.java | 39 +++++++++
.../camel/processor/jpa/JpaTransactedTest.java | 94 ++++++++++++++++++++++
.../src/test/resources/META-INF/persistence.xml | 17 ++++
.../processor/jpa/springJpaRoutePoolingTest.xml | 36 +++++++++
.../org/apache/camel/ExchangeConstantProvider.java | 3 +-
.../src/main/java/org/apache/camel/Exchange.java | 1 +
.../apache/camel/processor/MulticastProcessor.java | 10 ++-
.../camel/processor/RecipientListProcessor.java | 12 +++
.../java/org/apache/camel/processor/Splitter.java | 12 +++
10 files changed, 227 insertions(+), 2 deletions(-)
diff --git a/components/camel-jpa/pom.xml b/components/camel-jpa/pom.xml
index da76413ad8c..66fd78568f5 100644
--- a/components/camel-jpa/pom.xml
+++ b/components/camel-jpa/pom.xml
@@ -82,6 +82,11 @@
<artifactId>spring-jdbc</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
index 463c888b1ce..0034aa67559 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
@@ -60,6 +60,14 @@ public final class JpaHelper {
em = getEntityManagerMap(exchange).get(getKey(entityManagerFactory));
}
+ // then try reuse any entity manager from the transaction context
+ if (em == null && exchange != null && exchange.isTransacted()) {
+ Map<String, Object> data = getTransactionContextData(exchange);
+ if (data != null) {
+ em = (EntityManager) data.get(getKey(entityManagerFactory));
+ }
+ }
+
if (em == null && useSharedEntityManager) {
em = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
}
@@ -75,6 +83,21 @@ public final class JpaHelper {
return em;
}
+ /**
+ * Copy JpaConstants.ENTITY_MANAGER property from source to target exchange.
+ *
+ * @param target The target exchange
+ * @param source The source exchange
+ */
+ public static void copyEntityManagers(Exchange target, Exchange source) {
+ if (target != null && source != null && target.getProperty(JpaConstants.ENTITY_MANAGER) == null) {
+ Map<String, EntityManager> entityManagers = source.getProperty(JpaConstants.ENTITY_MANAGER, Map.class);
+ if (entityManagers != null) {
+ target.setProperty(JpaConstants.ENTITY_MANAGER, entityManagers);
+ }
+ }
+ }
+
private static EntityManager createEntityManager(Exchange exchange, EntityManagerFactory entityManagerFactory) {
EntityManager em;
em = entityManagerFactory.createEntityManager();
@@ -82,11 +105,27 @@ public final class JpaHelper {
// we want to reuse the EM so store as property and make sure we close it when done with the exchange
Map<String, EntityManager> entityManagers = getEntityManagerMap(exchange);
entityManagers.put(getKey(entityManagerFactory), em);
+
+ // we want to reuse the EM in the same transaction
+ if (exchange.isTransacted()) {
+ Map<String, Object> data = getTransactionContextData(exchange);
+ if (data != null) {
+ data.put(getKey(entityManagerFactory), em);
+ }
+ }
exchange.adapt(ExtendedExchange.class).addOnCompletion(new JpaCloseEntityManagerOnCompletion(em));
}
return em;
}
+ private static Map<String, Object> getTransactionContextData(Exchange exchange) {
+ Map<String, Object> data = null;
+ if (exchange.isTransacted()) {
+ data = exchange.getProperty(Exchange.TRANSACTION_CONTEXT_DATA, Map.class);
+ }
+ return data;
+ }
+
@SuppressWarnings("unchecked")
private static Map<String, EntityManager> getEntityManagerMap(Exchange exchange) {
Map<String, EntityManager> entityManagers = exchange.getProperty(JpaConstants.ENTITY_MANAGER, Map.class);
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java
new file mode 100644
index 00000000000..61f64e86b4b
--- /dev/null
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.util.Arrays;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jpa.JpaHelper;
+import org.apache.camel.examples.SendEmail;
+import org.junit.jupiter.api.Test;
+
+public class JpaTransactedTest extends AbstractJpaTest {
+ protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x";
+
+ @Test
+ public void testTransactedSplit() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(2);
+ template.sendBody("direct:split", Arrays.asList(
+ new SendEmail("test1@example.org"), new SendEmail("test2@example.org")));
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testTransactedMulticast() throws Exception {
+ template.sendBody("direct:multicast", new SendEmail("test@example.org"));
+ }
+
+ @Test
+ public void testTransactedRecipientList() throws Exception {
+ template.sendBody("direct:recipient", new SendEmail("test@example.org"));
+ }
+
+ @Test
+ public void testTransactedEnrich() throws Exception {
+ template.sendBody("direct:enrich", new SendEmail("test@example.org"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:split")
+ .transacted().split().body()
+ .to("jpa://" + SendEmail.class.getName())
+ .to("mock:result");
+
+ from("direct:multicast")
+ .transacted().multicast()
+ .to("jpa://" + SendEmail.class.getName(), "jpa://" + SendEmail.class.getName());
+
+ from("direct:recipient")
+ .transacted().recipientList(
+ constant("jpa://" + SendEmail.class.getName() + "," + "jpa://" + SendEmail.class.getName()));
+
+ from("direct:enrich")
+ .transacted().enrich("jpa://" + SendEmail.class.getName(), new AggregationStrategy() {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ JpaHelper.copyEntityManagers(oldExchange, newExchange);
+ return oldExchange;
+ }
+ })
+ .to("jpa://" + SendEmail.class.getName());
+ }
+ };
+ }
+
+ @Override
+ protected String routeXml() {
+ return "org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml";
+ }
+
+ @Override
+ protected String selectAllString() {
+ return SELECT_ALL_STRING;
+ }
+}
diff --git a/components/camel-jpa/src/test/resources/META-INF/persistence.xml b/components/camel-jpa/src/test/resources/META-INF/persistence.xml
index f0c146b1d63..76f6cd393a1 100644
--- a/components/camel-jpa/src/test/resources/META-INF/persistence.xml
+++ b/components/camel-jpa/src/test/resources/META-INF/persistence.xml
@@ -63,6 +63,23 @@
</properties>
</persistence-unit>
+ <persistence-unit name="pooling" transaction-type="RESOURCE_LOCAL">
+ <class>org.apache.camel.examples.SendEmail</class>
+
+ <properties>
+ <property name="openjpa.ConnectionProperties"
+ value="DriverClassName=org.apache.derby.jdbc.EmbeddedDriver,
+ Url=jdbc:derby:target/custom;create=true,
+ MaxTotal=1,
+ MaxWaitMillis=1000,
+ TestOnBorrow=true"/>
+ <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp2.BasicDataSource"/>
+ <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
+ <property name="openjpa.Log" value="DefaultLevel=WARN, Tool=INFO"/>
+ <property name="openjpa.Multithreaded" value="true"/>
+ </properties>
+ </persistence-unit>
+
<!-- START SNIPPET: e1 -->
<persistence-unit name="idempotentDb" transaction-type="RESOURCE_LOCAL">
<class>org.apache.camel.processor.idempotent.jpa.MessageProcessed</class>
diff --git a/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml
new file mode 100644
index 00000000000..ce4232d3b2f
--- /dev/null
+++ b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
+ <property name="persistenceUnitName" value="pooling"/>
+ </bean>
+
+ <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
+ <property name="entityManagerFactory" ref="entityManagerFactory"/>
+ </bean>
+
+ <bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate">
+ <property name="transactionManager" ref="transactionManager"/>
+ </bean>
+
+</beans>
diff --git a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
index d495cac7f9a..4fd9528ca69 100644
--- a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
+++ b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
@@ -11,7 +11,7 @@ public class ExchangeConstantProvider {
private static final Map<String, String> MAP;
static {
- Map<String, String> map = new HashMap<>(154);
+ Map<String, String> map = new HashMap<>(155);
map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType");
map.put("AGGREGATED_COLLECTION_GUARD", "CamelAggregatedCollectionGuard");
map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy");
@@ -159,6 +159,7 @@ public class ExchangeConstantProvider {
map.put("TRACE_EVENT_TIMESTAMP", "CamelTraceEventTimestamp");
map.put("TRACING_HEADER_FORMAT", "CamelTracingHeaderFormat");
map.put("TRACING_OUTPUT_FORMAT", "CamelTracingOutputFormat");
+ map.put("TRANSACTION_CONTEXT_DATA", "CamelTransactionContextData");
map.put("TRANSFER_ENCODING", "Transfer-Encoding");
map.put("TRY_ROUTE_BLOCK", "TryRouteBlock");
map.put("UNIT_OF_WORK_EXHAUSTED", "CamelUnitOfWorkExhausted");
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 3ba0e4b5edb..7e248c0e578 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -254,6 +254,7 @@ public interface Exchange {
String TRACING_HEADER_FORMAT = "CamelTracingHeaderFormat";
@Deprecated
String TRACING_OUTPUT_FORMAT = "CamelTracingOutputFormat";
+ String TRANSACTION_CONTEXT_DATA = "CamelTransactionContextData";
String TRY_ROUTE_BLOCK = "TryRouteBlock";
String TRANSFER_ENCODING = "Transfer-Encoding";
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index fb231ab762c..2cc448af400 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -899,6 +899,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange)
throws Exception {
List<ProcessorExchangePair> result = new ArrayList<>(processors.size());
+ Map<String, Object> txData = null;
StreamCache streamCache = null;
if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) {
@@ -911,7 +912,14 @@ public class MulticastProcessor extends AsyncProcessorSupport
// copy exchange, and do not share the unit of work
Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
-
+ // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData
+ // during the transaction.
+ if (exchange.isTransacted() && copy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
+ if (txData == null) {
+ txData = new ConcurrentHashMap<>();
+ }
+ copy.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData);
+ }
if (streamCache != null) {
if (index > 0) {
// copy it otherwise parallel processing is not possible,
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 7dbd3741e87..8c1a735a86d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AggregationStrategy;
@@ -69,6 +71,7 @@ public class RecipientListProcessor extends MulticastProcessor {
private final String delimiter;
private final ProducerCache producerCache;
private int cacheSize;
+ private Map<String, Object> txData;
/**
* Class that represent each step in the recipient list to do
@@ -290,6 +293,15 @@ public class RecipientListProcessor extends MulticastProcessor {
Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
+ // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData
+ // during the transaction.
+ if (exchange.isTransacted() && copy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
+ if (txData == null) {
+ txData = new ConcurrentHashMap<>();
+ }
+ copy.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData);
+ }
+
// if we share unit of work, we need to prepare the child exchange
if (isShareUnitOfWork()) {
prepareSharedUnitOfWork(copy, exchange);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 4b9b7adf2e1..597bc007391 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AggregationStrategy;
@@ -203,6 +205,8 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
private int index;
private boolean closed;
+ private Map<String, Object> txData;
+
public boolean hasNext() {
if (closed) {
return false;
@@ -229,6 +233,14 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
// and do not share the unit of work
Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false);
newExchange.adapt(ExtendedExchange.class).setTransacted(original.isTransacted());
+ // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData
+ // during the transaction.
+ if (original.isTransacted() && newExchange.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
+ if (txData == null) {
+ txData = new ConcurrentHashMap<>();
+ }
+ newExchange.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData);
+ }
// If the splitter has an aggregation strategy
// then the StreamCache created by the child routes must not be
// closed by the unit of work of the child route, but by the unit of