You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zh...@apache.org on 2022/08/17 01:46:28 UTC

[camel] branch camel-3.18.x updated: CAMEL-18377: camel-jpa - resue an EntityManager from the current tran… (#8141)

This is an automated email from the ASF dual-hosted git repository.

zhfeng pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new c18f49b9589 CAMEL-18377: camel-jpa - resue an EntityManager from the current tran… (#8141)
c18f49b9589 is described below

commit c18f49b9589a451ce7013c9a5bc2cbac5a8a9b61
Author: Zheng Feng <zh...@gmail.com>
AuthorDate: Tue Aug 16 14:03:49 2022 +0800

    CAMEL-18377: camel-jpa - resue an EntityManager from the current tran… (#8141)
    
    * CAMEL-18377: camel-jpa - reuse an EntityManager in the current transaction if possible
    
    * Fix to create txData only if the exchange is transacted
    
    * Add a unit test with pooling dataSource for transactd() combined with split()
    
    * Fix in multicast
    
    * Fix in RecipientList
    
    * Add a Enrich test
    
    * Fix CS
---
 components/camel-jpa/pom.xml                       |  5 ++
 .../org/apache/camel/component/jpa/JpaHelper.java  | 39 +++++++++
 .../camel/processor/jpa/JpaTransactedTest.java     | 94 ++++++++++++++++++++++
 .../src/test/resources/META-INF/persistence.xml    | 17 ++++
 .../processor/jpa/springJpaRoutePoolingTest.xml    | 36 +++++++++
 .../org/apache/camel/ExchangeConstantProvider.java |  3 +-
 .../src/main/java/org/apache/camel/Exchange.java   |  1 +
 .../apache/camel/processor/MulticastProcessor.java | 10 ++-
 .../camel/processor/RecipientListProcessor.java    | 12 +++
 .../java/org/apache/camel/processor/Splitter.java  | 12 +++
 10 files changed, 227 insertions(+), 2 deletions(-)

diff --git a/components/camel-jpa/pom.xml b/components/camel-jpa/pom.xml
index da76413ad8c..66fd78568f5 100644
--- a/components/camel-jpa/pom.xml
+++ b/components/camel-jpa/pom.xml
@@ -82,6 +82,11 @@
             <artifactId>spring-jdbc</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-dbcp2</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.derby</groupId>
             <artifactId>derby</artifactId>
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
index 463c888b1ce..0034aa67559 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
@@ -60,6 +60,14 @@ public final class JpaHelper {
             em = getEntityManagerMap(exchange).get(getKey(entityManagerFactory));
         }
 
+        // then try reuse any entity manager from the transaction context
+        if (em == null && exchange != null && exchange.isTransacted()) {
+            Map<String, Object> data = getTransactionContextData(exchange);
+            if (data != null) {
+                em = (EntityManager) data.get(getKey(entityManagerFactory));
+            }
+        }
+
         if (em == null && useSharedEntityManager) {
             em = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
         }
@@ -75,6 +83,21 @@ public final class JpaHelper {
         return em;
     }
 
+    /**
+     * Copy JpaConstants.ENTITY_MANAGER property from source to target exchange.
+     *
+     * @param target The target exchange
+     * @param source The source exchange
+     */
+    public static void copyEntityManagers(Exchange target, Exchange source) {
+        if (target != null && source != null && target.getProperty(JpaConstants.ENTITY_MANAGER) == null) {
+            Map<String, EntityManager> entityManagers = source.getProperty(JpaConstants.ENTITY_MANAGER, Map.class);
+            if (entityManagers != null) {
+                target.setProperty(JpaConstants.ENTITY_MANAGER, entityManagers);
+            }
+        }
+    }
+
     private static EntityManager createEntityManager(Exchange exchange, EntityManagerFactory entityManagerFactory) {
         EntityManager em;
         em = entityManagerFactory.createEntityManager();
@@ -82,11 +105,27 @@ public final class JpaHelper {
             // we want to reuse the EM so store as property and make sure we close it when done with the exchange
             Map<String, EntityManager> entityManagers = getEntityManagerMap(exchange);
             entityManagers.put(getKey(entityManagerFactory), em);
+
+            // we want to reuse the EM in the same transaction
+            if (exchange.isTransacted()) {
+                Map<String, Object> data = getTransactionContextData(exchange);
+                if (data != null) {
+                    data.put(getKey(entityManagerFactory), em);
+                }
+            }
             exchange.adapt(ExtendedExchange.class).addOnCompletion(new JpaCloseEntityManagerOnCompletion(em));
         }
         return em;
     }
 
+    private static Map<String, Object> getTransactionContextData(Exchange exchange) {
+        Map<String, Object> data = null;
+        if (exchange.isTransacted()) {
+            data = exchange.getProperty(Exchange.TRANSACTION_CONTEXT_DATA, Map.class);
+        }
+        return data;
+    }
+
     @SuppressWarnings("unchecked")
     private static Map<String, EntityManager> getEntityManagerMap(Exchange exchange) {
         Map<String, EntityManager> entityManagers = exchange.getProperty(JpaConstants.ENTITY_MANAGER, Map.class);
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java
new file mode 100644
index 00000000000..61f64e86b4b
--- /dev/null
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.util.Arrays;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jpa.JpaHelper;
+import org.apache.camel.examples.SendEmail;
+import org.junit.jupiter.api.Test;
+
+public class JpaTransactedTest extends AbstractJpaTest {
+    protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x";
+
+    @Test
+    public void testTransactedSplit() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(2);
+        template.sendBody("direct:split", Arrays.asList(
+                new SendEmail("test1@example.org"), new SendEmail("test2@example.org")));
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testTransactedMulticast() throws Exception {
+        template.sendBody("direct:multicast", new SendEmail("test@example.org"));
+    }
+
+    @Test
+    public void testTransactedRecipientList() throws Exception {
+        template.sendBody("direct:recipient", new SendEmail("test@example.org"));
+    }
+
+    @Test
+    public void testTransactedEnrich() throws Exception {
+        template.sendBody("direct:enrich", new SendEmail("test@example.org"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:split")
+                        .transacted().split().body()
+                        .to("jpa://" + SendEmail.class.getName())
+                        .to("mock:result");
+
+                from("direct:multicast")
+                        .transacted().multicast()
+                        .to("jpa://" + SendEmail.class.getName(), "jpa://" + SendEmail.class.getName());
+
+                from("direct:recipient")
+                        .transacted().recipientList(
+                                constant("jpa://" + SendEmail.class.getName() + "," + "jpa://" + SendEmail.class.getName()));
+
+                from("direct:enrich")
+                        .transacted().enrich("jpa://" + SendEmail.class.getName(), new AggregationStrategy() {
+                            @Override
+                            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                                JpaHelper.copyEntityManagers(oldExchange, newExchange);
+                                return oldExchange;
+                            }
+                        })
+                        .to("jpa://" + SendEmail.class.getName());
+            }
+        };
+    }
+
+    @Override
+    protected String routeXml() {
+        return "org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml";
+    }
+
+    @Override
+    protected String selectAllString() {
+        return SELECT_ALL_STRING;
+    }
+}
diff --git a/components/camel-jpa/src/test/resources/META-INF/persistence.xml b/components/camel-jpa/src/test/resources/META-INF/persistence.xml
index f0c146b1d63..76f6cd393a1 100644
--- a/components/camel-jpa/src/test/resources/META-INF/persistence.xml
+++ b/components/camel-jpa/src/test/resources/META-INF/persistence.xml
@@ -63,6 +63,23 @@
     </properties>
   </persistence-unit>
 
+  <persistence-unit name="pooling" transaction-type="RESOURCE_LOCAL">
+    <class>org.apache.camel.examples.SendEmail</class>
+
+    <properties>
+      <property name="openjpa.ConnectionProperties"
+                value="DriverClassName=org.apache.derby.jdbc.EmbeddedDriver,
+                   Url=jdbc:derby:target/custom;create=true,
+                   MaxTotal=1,
+                   MaxWaitMillis=1000,
+                   TestOnBorrow=true"/>
+      <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp2.BasicDataSource"/>
+      <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
+      <property name="openjpa.Log" value="DefaultLevel=WARN, Tool=INFO"/>
+      <property name="openjpa.Multithreaded" value="true"/>
+    </properties>
+  </persistence-unit>
+
   <!-- START SNIPPET: e1 -->
   <persistence-unit name="idempotentDb" transaction-type="RESOURCE_LOCAL">
     <class>org.apache.camel.processor.idempotent.jpa.MessageProcessed</class>
diff --git a/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml
new file mode 100644
index 00000000000..ce4232d3b2f
--- /dev/null
+++ b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+    <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
+        <property name="persistenceUnitName" value="pooling"/>
+    </bean>
+
+    <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
+        <property name="entityManagerFactory" ref="entityManagerFactory"/>
+    </bean>
+
+    <bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate">
+        <property name="transactionManager" ref="transactionManager"/>
+    </bean>
+
+</beans>
diff --git a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
index d495cac7f9a..4fd9528ca69 100644
--- a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
+++ b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
@@ -11,7 +11,7 @@ public class ExchangeConstantProvider {
 
     private static final Map<String, String> MAP;
     static {
-        Map<String, String> map = new HashMap<>(154);
+        Map<String, String> map = new HashMap<>(155);
         map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType");
         map.put("AGGREGATED_COLLECTION_GUARD", "CamelAggregatedCollectionGuard");
         map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy");
@@ -159,6 +159,7 @@ public class ExchangeConstantProvider {
         map.put("TRACE_EVENT_TIMESTAMP", "CamelTraceEventTimestamp");
         map.put("TRACING_HEADER_FORMAT", "CamelTracingHeaderFormat");
         map.put("TRACING_OUTPUT_FORMAT", "CamelTracingOutputFormat");
+        map.put("TRANSACTION_CONTEXT_DATA", "CamelTransactionContextData");
         map.put("TRANSFER_ENCODING", "Transfer-Encoding");
         map.put("TRY_ROUTE_BLOCK", "TryRouteBlock");
         map.put("UNIT_OF_WORK_EXHAUSTED", "CamelUnitOfWorkExhausted");
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 3ba0e4b5edb..7e248c0e578 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -254,6 +254,7 @@ public interface Exchange {
     String TRACING_HEADER_FORMAT = "CamelTracingHeaderFormat";
     @Deprecated
     String TRACING_OUTPUT_FORMAT = "CamelTracingOutputFormat";
+    String TRANSACTION_CONTEXT_DATA = "CamelTransactionContextData";
     String TRY_ROUTE_BLOCK = "TryRouteBlock";
     String TRANSFER_ENCODING = "Transfer-Encoding";
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index fb231ab762c..2cc448af400 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -899,6 +899,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
     protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange)
             throws Exception {
         List<ProcessorExchangePair> result = new ArrayList<>(processors.size());
+        Map<String, Object> txData = null;
 
         StreamCache streamCache = null;
         if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) {
@@ -911,7 +912,14 @@ public class MulticastProcessor extends AsyncProcessorSupport
             // copy exchange, and do not share the unit of work
             Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
             copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
-
+            // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData
+            // during the transaction.
+            if (exchange.isTransacted() && copy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
+                if (txData == null) {
+                    txData = new ConcurrentHashMap<>();
+                }
+                copy.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData);
+            }
             if (streamCache != null) {
                 if (index > 0) {
                     // copy it otherwise parallel processing is not possible,
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 7dbd3741e87..8c1a735a86d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AggregationStrategy;
@@ -69,6 +71,7 @@ public class RecipientListProcessor extends MulticastProcessor {
     private final String delimiter;
     private final ProducerCache producerCache;
     private int cacheSize;
+    private Map<String, Object> txData;
 
     /**
      * Class that represent each step in the recipient list to do
@@ -290,6 +293,15 @@ public class RecipientListProcessor extends MulticastProcessor {
         Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
         copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
 
+        // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData
+        // during the transaction.
+        if (exchange.isTransacted() && copy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
+            if (txData == null) {
+                txData = new ConcurrentHashMap<>();
+            }
+            copy.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData);
+        }
+
         // if we share unit of work, we need to prepare the child exchange
         if (isShareUnitOfWork()) {
             prepareSharedUnitOfWork(copy, exchange);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 4b9b7adf2e1..597bc007391 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AggregationStrategy;
@@ -203,6 +205,8 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
                 private int index;
                 private boolean closed;
 
+                private Map<String, Object> txData;
+
                 public boolean hasNext() {
                     if (closed) {
                         return false;
@@ -229,6 +233,14 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
                         // and do not share the unit of work
                         Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false);
                         newExchange.adapt(ExtendedExchange.class).setTransacted(original.isTransacted());
+                        // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData
+                        // during the transaction.
+                        if (original.isTransacted() && newExchange.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
+                            if (txData == null) {
+                                txData = new ConcurrentHashMap<>();
+                            }
+                            newExchange.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData);
+                        }
                         // If the splitter has an aggregation strategy
                         // then the StreamCache created by the child routes must not be
                         // closed by the unit of work of the child route, but by the unit of