You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2013/10/19 02:10:32 UTC

git commit: CAMEL-6874: Avoid usage of the same EntityManager object across multiple threads as per se EntityManagers are not thread-safe. Also polished the code a bit as well as removed some unused/aimless logic by the JpaProducer#process() method.

Updated Branches:
  refs/heads/master cd82ef8b3 -> 8bcaa4524


CAMEL-6874: Avoid usage of the same EntityManager object across multiple threads as per se EntityManagers are not thread-safe. Also polished the code a bit as well as removed some unused/aimless logic by the JpaProducer#process() method.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bcaa452
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bcaa452
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bcaa452

Branch: refs/heads/master
Commit: 8bcaa4524d57ca473593b7dff46c40f8172b2233
Parents: cd82ef8
Author: Babak Vahdat <bv...@apache.org>
Authored: Sat Oct 19 02:10:21 2013 +0200
Committer: Babak Vahdat <bv...@apache.org>
Committed: Sat Oct 19 02:10:21 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jpa/JpaConsumer.java | 11 ++++-
 .../apache/camel/component/jpa/JpaEndpoint.java | 13 +-----
 .../apache/camel/component/jpa/JpaProducer.java | 45 ++++++++++----------
 .../component/jpa/AbstractJpaMethodTest.java    |  2 +-
 .../org/apache/camel/component/jpa/JpaTest.java |  2 +-
 .../camel/component/jpa/JpaUseMergeTest.java    |  4 +-
 .../jpa/JpaWithNamedQueryAndParametersTest.java |  2 +-
 .../component/jpa/JpaWithNamedQueryTest.java    |  9 ++--
 8 files changed, 45 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index e94c113..91ffd29 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -44,7 +44,6 @@ import org.springframework.transaction.support.TransactionTemplate;
  * @version 
  */
 public class JpaConsumer extends ScheduledBatchPollingConsumer {
-
     private static final Logger LOG = LoggerFactory.getLogger(JpaConsumer.class);
     private final JpaEndpoint endpoint;
     private final EntityManager entityManager;
@@ -71,7 +70,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
     public JpaConsumer(JpaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
-        this.entityManager = endpoint.getEntityManager();
+        this.entityManager = endpoint.createEntityManager();
         this.transactionTemplate = endpoint.createTransactionTemplate();
     }
 
@@ -441,4 +440,12 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
         exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, entityManager);
         return exchange;
     }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        entityManager.close();
+        LOG.trace("closed the EntityManager {} on {}", entityManager, this);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
index 78c64d2..97d7fdc 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
@@ -43,7 +43,6 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
 
     private EntityManagerFactory entityManagerFactory;
     private PlatformTransactionManager transactionManager;
-    private EntityManager entityManager;
     private String persistenceUnit = "camel";
     private Expression producerExpression;
     private int maximumResults = -1;
@@ -263,11 +262,8 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         return tm;
     }
 
-    protected EntityManager getEntityManager() {
-        if (entityManager == null) {
-            entityManager = getEntityManagerFactory().createEntityManager();
-        }
-        return entityManager;
+    protected EntityManager createEntityManager() {
+        return getEntityManagerFactory().createEntityManager();
     }
 
     protected TransactionTemplate createTransactionTemplate() {
@@ -302,9 +298,4 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         };
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-//        entityManager.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
index 8393a9e..cb981ce 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
@@ -16,15 +16,15 @@
  */
 package org.apache.camel.component.jpa;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 
 import javax.persistence.EntityManager;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.transaction.TransactionStatus;
 import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
@@ -33,19 +33,23 @@ import org.springframework.transaction.support.TransactionTemplate;
  * @version 
  */
 public class JpaProducer extends DefaultProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(JpaProducer.class);
     private final EntityManager entityManager;
     private final TransactionTemplate transactionTemplate;
-    private final JpaEndpoint endpoint;
     private final Expression expression;
 
     public JpaProducer(JpaEndpoint endpoint, Expression expression) {
         super(endpoint);
-        this.endpoint = endpoint;
         this.expression = expression;
-        this.entityManager = endpoint.getEntityManager();
+        this.entityManager = endpoint.createEntityManager();
         this.transactionTemplate = endpoint.createTransactionTemplate();
     }
 
+    @Override
+    public JpaEndpoint getEndpoint() {
+        return (JpaEndpoint) super.getEndpoint();
+    }
+
     public void process(final Exchange exchange) {
         exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, entityManager);
         final Object values = expression.evaluate(exchange, Object.class);
@@ -53,35 +57,24 @@ public class JpaProducer extends DefaultProducer {
             transactionTemplate.execute(new TransactionCallback<Object>() {
                 public Object doInTransaction(TransactionStatus status) {
                     entityManager.joinTransaction();
-
                     if (values.getClass().isArray()) {
                         Object[] array = (Object[])values;
                         for (int index = 0; index < array.length; index++) {
-                            Object managedEntity = save(array[index], entityManager);
-                            if (!endpoint.isUsePersist()) {
-                                array[index] = managedEntity;
-                            }
+                            save(array[index], entityManager);
                         }
                     } else if (values instanceof Collection) {
-                        @SuppressWarnings("unchecked")
-                        Collection<Object> collection = (Collection<Object>)values;
-                        List<Object> managedEntities = new ArrayList<Object>();
+                        Collection<?> collection = (Collection<?>)values;
                         for (Object entity : collection) {
-                            Object managedEntity = save(entity, entityManager);
-                            managedEntities.add(managedEntity);
-                        }
-                        if (!endpoint.isUsePersist()) {
-                            collection.clear();
-                            collection.addAll(managedEntities);
+                            save(entity, entityManager);
                         }
                     } else {
                         Object managedEntity = save(values, entityManager);
-                        if (!endpoint.isUsePersist()) {
+                        if (!getEndpoint().isUsePersist()) {
                             exchange.getIn().setBody(managedEntity);
                         }
                     }
 
-                    if (endpoint.isFlushOnSend()) {
+                    if (getEndpoint().isFlushOnSend()) {
                         // there may be concurrency so need to join tx before flush
                         entityManager.joinTransaction();
                         entityManager.flush();
@@ -97,7 +90,7 @@ public class JpaProducer extends DefaultProducer {
                 private Object save(final Object entity, EntityManager entityManager) {
                     // there may be concurrency so need to join tx before persist/merge
                     entityManager.joinTransaction();
-                    if (endpoint.isUsePersist()) {
+                    if (getEndpoint().isUsePersist()) {
                         entityManager.persist(entity);
                         return entity;
                     } else {
@@ -108,4 +101,12 @@ public class JpaProducer extends DefaultProducer {
         }
         exchange.getIn().removeHeader(JpaConstants.ENTITYMANAGER);
     }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        entityManager.close();
+        LOG.trace("closed the EntityManager {} on {}", entityManager, this);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
index 6f8db08..bc0aca4 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
@@ -148,7 +148,7 @@ public abstract class AbstractJpaMethodTest extends CamelTestSupport {
         endpoint = context.getEndpoint(endpointUri, JpaEndpoint.class);
 
         transactionTemplate = endpoint.createTransactionTemplate();
-        entityManager = endpoint.getEntityManager();
+        entityManager = endpoint.createEntityManager();
         
         transactionTemplate.execute(new TransactionCallback<Object>() {
             public Object doInTransaction(TransactionStatus status) {

http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
index 7125526..8977271 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
@@ -118,7 +118,7 @@ public class JpaTest extends Assert {
         endpoint = (JpaEndpoint) value;
 
         transactionTemplate = endpoint.createTransactionTemplate();
-        entityManager = endpoint.getEntityManager();
+        entityManager = endpoint.createEntityManager();
     }
 
     protected String getEndpointUri() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java
index 1a5fe17..8106c5a 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java
@@ -49,7 +49,9 @@ public class JpaUseMergeTest extends AbstractJpaMethodTest {
         
         assertEntitiesInDatabase(1, Customer.class.getName());
         assertEntitiesInDatabase(1, Address.class.getName());
-        
+
+        // do detach the persisted entity first before modifying it
+        entityManager.detach(customer);
         customer.setName("Max Mustermann");
         customer.getAddress().setAddressLine1("Musterstr. 1");
         customer.getAddress().setAddressLine2("11111 Enterhausen");

http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java
index 8494cc6..667ab99 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java
@@ -138,7 +138,7 @@ public class JpaWithNamedQueryAndParametersTest extends Assert {
         endpoint = (JpaEndpoint)value;
 
         transactionTemplate = endpoint.createTransactionTemplate();
-        entityManager = endpoint.getEntityManager();
+        entityManager = endpoint.createEntityManager();
     }
 
     protected String getEndpointUri() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
index 85126e4..d6bae56 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
@@ -111,10 +111,12 @@ public class JpaWithNamedQueryTest extends Assert {
 
         transactionTemplate.execute(new TransactionCallback<Object>() {
             public Object doInTransaction(TransactionStatus status) {
-                entityManager.joinTransaction();
+                // make use of the EntityManager having the relevant persistence-context
+                EntityManager entityManager2 = receivedExchange.getIn().getHeader(JpaConstants.ENTITYMANAGER, EntityManager.class);
+                entityManager2.joinTransaction();
 
                 // now lets assert that there are still 2 entities left
-                List<?> rows = entityManager.createQuery("select x from MultiSteps x").getResultList();
+                List<?> rows = entityManager2.createQuery("select x from MultiSteps x").getResultList();
                 assertEquals("Number of entities: " + rows, 2, rows.size());
 
                 int counter = 1;
@@ -125,7 +127,6 @@ public class JpaWithNamedQueryTest extends Assert {
 
                     if (row.getAddress().equals("foo@bar.com")) {
                         LOG.info("Found updated row: " + row);
-
                         assertEquals("Updated row step for: " + row, getUpdatedStepValue(), row.getStep());
                     } else {
                         // dummy row
@@ -166,7 +167,7 @@ public class JpaWithNamedQueryTest extends Assert {
         endpoint = (JpaEndpoint)value;
 
         transactionTemplate = endpoint.createTransactionTemplate();
-        entityManager = endpoint.getEntityManager();
+        entityManager = endpoint.createEntityManager();
     }
 
     protected String getEndpointUri() {