You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/14 17:13:28 UTC

camel git commit: CAMEL-8054: Added option to share enity manager to camel-jpa. Thanks to Chriss Watts for the patch.

Repository: camel
Updated Branches:
  refs/heads/master 23b975a8f -> 3a98dd9d7


CAMEL-8054: Added option to share enity manager to camel-jpa. Thanks to Chriss Watts for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3a98dd9d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3a98dd9d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3a98dd9d

Branch: refs/heads/master
Commit: 3a98dd9d79aab778276ca96969599c49be82ec17
Parents: 23b975a
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 14 17:16:21 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 14 17:16:21 2015 +0200

----------------------------------------------------------------------
 .../camel/component/jpa/JpaComponent.java       |  14 ++
 .../apache/camel/component/jpa/JpaConsumer.java |  16 ++-
 .../apache/camel/component/jpa/JpaEndpoint.java |  21 ++-
 .../apache/camel/component/jpa/JpaHelper.java   |   8 +-
 .../apache/camel/component/jpa/JpaProducer.java |   3 +-
 .../idempotent/jpa/JpaMessageIdRepository.java  |  17 ++-
 .../jpa/JpaRouteSharedEntityManagerTest.java    | 130 +++++++++++++++++++
 7 files changed, 200 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
index 73416e2..c2c0227 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
@@ -37,6 +37,7 @@ public class JpaComponent extends UriEndpointComponent {
     private EntityManagerFactory entityManagerFactory;
     private PlatformTransactionManager transactionManager;
     private boolean joinTransaction = true;
+    private boolean sharedEntityManager;
 
     public JpaComponent() {
         super(JpaEndpoint.class);
@@ -80,6 +81,18 @@ public class JpaComponent extends UriEndpointComponent {
         this.joinTransaction = joinTransaction;
     }
 
+    public boolean isSharedEntityManager() {
+        return sharedEntityManager;
+    }
+
+    /**
+     * Whether to use Spring's SharedEntityManager for the consumer/producer.
+     * Note in most cases joinTransaction should be set to false as this is not an EXTENDED EntityManager.
+     */
+    public void setSharedEntityManager(boolean sharedEntityManager) {
+        this.sharedEntityManager = sharedEntityManager;
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 
@@ -87,6 +100,7 @@ public class JpaComponent extends UriEndpointComponent {
     protected Endpoint createEndpoint(String uri, String path, Map<String, Object> options) throws Exception {
         JpaEndpoint endpoint = new JpaEndpoint(uri, this);
         endpoint.setJoinTransaction(isJoinTransaction());
+        endpoint.setSharedEntityManager(isSharedEntityManager());
 
         // lets interpret the next string as a class
         if (ObjectHelper.isNotEmpty(path)) {

http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index 9824f7d..2bc82a2 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+
 import javax.persistence.Entity;
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
@@ -39,6 +40,7 @@ import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.orm.jpa.SharedEntityManagerCreator;
 import org.springframework.transaction.TransactionStatus;
 import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
@@ -500,7 +502,12 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        this.entityManager = entityManagerFactory.createEntityManager();
+
+        if (getEndpoint().isSharedEntityManager()) {
+            this.entityManager = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
+        } else {
+            this.entityManager = entityManagerFactory.createEntityManager();
+        }
         LOG.trace("Created EntityManager {} on {}", entityManager, this);
     }
 
@@ -512,7 +519,10 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
     @Override
     protected void doShutdown() throws Exception {
         super.doShutdown();
-        this.entityManager.close();
-        LOG.trace("Closed EntityManager {} on {}", entityManager, this);
+
+        if (entityManager != null) {
+            this.entityManager.close();
+            LOG.trace("Closed EntityManager {} on {}", entityManager, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
index 578689f..e725758 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
@@ -38,6 +38,7 @@ import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.springframework.orm.jpa.JpaTransactionManager;
 import org.springframework.orm.jpa.LocalEntityManagerFactoryBean;
+import org.springframework.orm.jpa.SharedEntityManagerCreator;
 import org.springframework.transaction.PlatformTransactionManager;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.support.TransactionTemplate;
@@ -56,6 +57,8 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
     private String persistenceUnit = "camel";
     @UriParam(defaultValue = "true")
     private boolean joinTransaction = true;
+    @UriParam
+    private boolean sharedEntityManager;
     @UriParam(label = "consumer", defaultValue = "-1")
     private int maximumResults = -1;
     @UriParam(label = "consumer", defaultValue = "true")
@@ -314,6 +317,18 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         this.usePassedInEntityManager = usePassedIn;
     }
 
+    public boolean isSharedEntityManager() {
+        return sharedEntityManager;
+    }
+
+    /**
+     * Whether to use Spring's SharedEntityManager for the consumer/producer.
+     * Note in most cases joinTransaction should be set to false as this is not an EXTENDED EntityManager.
+     */
+    public void setSharedEntityManager(boolean sharedEntityManager) {
+        this.sharedEntityManager = sharedEntityManager;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
 
@@ -340,7 +355,11 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
      */
     @Deprecated
     protected EntityManager createEntityManager() {
-        return getEntityManagerFactory().createEntityManager();
+        if (sharedEntityManager) {
+            return SharedEntityManagerCreator.createSharedEntityManager(getEntityManagerFactory());
+        } else {
+            return getEntityManagerFactory().createEntityManager();
+        }
     }
 
     protected TransactionTemplate createTransactionTemplate() {

http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
index 81df51b..44a5913 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
@@ -20,6 +20,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 
 import org.apache.camel.Exchange;
+import org.springframework.orm.jpa.SharedEntityManagerCreator;
 
 /**
  * Helper for JPA.
@@ -36,10 +37,11 @@ public final class JpaHelper {
      * @param entityManagerFactory     the entity manager factory (mandatory)
      * @param usePassedInEntityManager whether to use an existing {@link javax.persistence.EntityManager} which has been stored
      *                                 on the exchange in the header with key {@link org.apache.camel.component.jpa.JpaConstants#ENTITY_MANAGER}
+     * @param useSharedEntityManager   whether to use SharedEntityManagerCreator if not already passed in                             
      * @return the entity manager (is never null)
      */
     public static EntityManager getTargetEntityManager(Exchange exchange, EntityManagerFactory entityManagerFactory,
-                                                       boolean usePassedInEntityManager) {
+                                                       boolean usePassedInEntityManager, boolean useSharedEntityManager) {
         EntityManager em = null;
 
         // favor using entity manager provided as a header from the end user
@@ -52,6 +54,10 @@ public final class JpaHelper {
             em = exchange.getProperty(JpaConstants.ENTITY_MANAGER, EntityManager.class);
         }
 
+        if (em == null && useSharedEntityManager) {
+            em = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
+        }
+        
         if (em == null) {
             // create a new entity manager
             em = entityManagerFactory.createEntityManager();

http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
index 2fee422..cf96488 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
@@ -54,7 +54,8 @@ public class JpaProducer extends DefaultProducer {
 
     public void process(final Exchange exchange) {
         // resolve the entity manager before evaluating the expression
-        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, getEndpoint().isUsePassedInEntityManager());
+        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory,
+                getEndpoint().isUsePassedInEntityManager(), getEndpoint().isSharedEntityManager());
         final Object values = expression.evaluate(exchange, Object.class);
 
         if (values != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
index e352cff..4af8658 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
@@ -50,6 +50,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
     private final EntityManagerFactory entityManagerFactory;
     private final TransactionTemplate transactionTemplate;
     private boolean joinTransaction = true;
+    private boolean sharedEntityManager;
 
     public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory, String processorName) {
         this(entityManagerFactory, createTransactionTemplate(entityManagerFactory), processorName);
@@ -83,7 +84,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
 
     @Override
     public boolean add(final Exchange exchange, final String messageId) {
-        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true);
+        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager);
 
         // Run this in single transaction.
         Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
@@ -118,7 +119,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
 
     @Override
     public boolean contains(final Exchange exchange, final String messageId) {
-        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true);
+        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager);
 
         // Run this in single transaction.
         Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
@@ -147,7 +148,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
 
     @Override
     public boolean remove(final Exchange exchange, final String messageId) {
-        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true);
+        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager);
 
         Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
             public Boolean doInTransaction(TransactionStatus status) {
@@ -203,6 +204,15 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
         this.joinTransaction = joinTransaction;
     }
 
+    @ManagedAttribute(description = "Whether to use shared EntityManager")
+    public boolean isSharedEntityManager() {
+        return sharedEntityManager;
+    }
+
+    public void setSharedEntityManager(boolean sharedEntityManager) {
+        this.sharedEntityManager = sharedEntityManager;
+    }
+    
     @Override
     protected void doStart() throws Exception {
         // noop
@@ -212,4 +222,5 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
     protected void doStop() throws Exception {
         // noop
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
new file mode 100644
index 0000000..c534da1
--- /dev/null
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.examples.SendEmail;
+import org.apache.camel.spring.SpringRouteBuilder;
+import org.junit.Test;
+import org.springframework.context.expression.BeanFactoryResolver;
+import org.springframework.expression.Expression;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
+import org.springframework.orm.jpa.LocalEntityManagerFactoryBean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+/**
+ * @version 
+ */
+public class JpaRouteSharedEntityManagerTest extends AbstractJpaTest {
+    protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x";
+    private CountDownLatch latch = new CountDownLatch(1);
+    
+    @Test
+    public void testRouteJpaShared() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        int countStart = getBrokerCount();
+        assertThat("brokerCount", countStart, equalTo(1));
+
+        template.sendBody("direct:startShared", new SendEmail("one@somewhere.org"));
+        // start route
+        context.startRoute("jpaShared");
+
+        // not the cleanest way to check the number of open connections
+        int countEnd = getBrokerCount();
+        assertThat("brokerCount", countEnd, equalTo(1));
+        
+        latch.countDown();
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private int getBrokerCount() {
+        LocalEntityManagerFactoryBean entityManagerFactory = applicationContext.getBean("&entityManagerFactory", LocalEntityManagerFactoryBean.class);
+
+        //uses Spring EL so we don't need to reference the classes
+        StandardEvaluationContext context = new StandardEvaluationContext(entityManagerFactory);
+        context.setBeanResolver(new BeanFactoryResolver(applicationContext));
+        SpelExpressionParser parser = new SpelExpressionParser();
+        Expression expression = parser.parseExpression("nativeEntityManagerFactory.brokerFactory.openBrokers"); 
+        List<?> brokers = expression.getValue(context, List.class);
+
+        return brokers.size();
+    }
+    
+    @Test
+    public void testRouteJpaNotShared() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:startNotshared", new SendEmail("one@somewhere.org"));
+
+        int countStart = getBrokerCount();
+        assertThat("brokerCount", countStart, equalTo(1));
+
+        // start route
+        context.startRoute("jpaOwn");
+
+        // not the cleanest way to check the number of open connections
+        int countEnd = getBrokerCount();
+        assertThat("brokerCount", countEnd, equalTo(2));
+        
+        latch.countDown();
+
+        assertMockEndpointsSatisfied();
+    }    
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new SpringRouteBuilder() {
+            public void configure() {
+                from("direct:startNotshared").to("jpa://" + SendEmail.class.getName() + "?");
+                from("direct:startShared").to("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=true&joinTransaction=false");
+                from("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=true&joinTransaction=false").routeId("jpaShared").autoStartup(false).process(new LatchProcessor()).to("mock:result");
+                from("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=false").routeId("jpaOwn").autoStartup(false).process(new LatchProcessor()).to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected String routeXml() {
+        return "org/apache/camel/processor/jpa/springJpaRouteTest.xml";
+    }
+
+    @Override
+    protected String selectAllString() {
+        return SELECT_ALL_STRING;
+    }
+    
+    private class LatchProcessor implements Processor {
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            latch.await(2, TimeUnit.SECONDS);
+        }
+    }
+}
+