You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/14 17:13:28 UTC
camel git commit: CAMEL-8054: Added option to share enity manager to
camel-jpa. Thanks to Chriss Watts for the patch.
Repository: camel
Updated Branches:
refs/heads/master 23b975a8f -> 3a98dd9d7
CAMEL-8054: Added option to share enity manager to camel-jpa. Thanks to Chriss Watts for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3a98dd9d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3a98dd9d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3a98dd9d
Branch: refs/heads/master
Commit: 3a98dd9d79aab778276ca96969599c49be82ec17
Parents: 23b975a
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 14 17:16:21 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 14 17:16:21 2015 +0200
----------------------------------------------------------------------
.../camel/component/jpa/JpaComponent.java | 14 ++
.../apache/camel/component/jpa/JpaConsumer.java | 16 ++-
.../apache/camel/component/jpa/JpaEndpoint.java | 21 ++-
.../apache/camel/component/jpa/JpaHelper.java | 8 +-
.../apache/camel/component/jpa/JpaProducer.java | 3 +-
.../idempotent/jpa/JpaMessageIdRepository.java | 17 ++-
.../jpa/JpaRouteSharedEntityManagerTest.java | 130 +++++++++++++++++++
7 files changed, 200 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
index 73416e2..c2c0227 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
@@ -37,6 +37,7 @@ public class JpaComponent extends UriEndpointComponent {
private EntityManagerFactory entityManagerFactory;
private PlatformTransactionManager transactionManager;
private boolean joinTransaction = true;
+ private boolean sharedEntityManager;
public JpaComponent() {
super(JpaEndpoint.class);
@@ -80,6 +81,18 @@ public class JpaComponent extends UriEndpointComponent {
this.joinTransaction = joinTransaction;
}
+ public boolean isSharedEntityManager() {
+ return sharedEntityManager;
+ }
+
+ /**
+ * Whether to use Spring's SharedEntityManager for the consumer/producer.
+ * Note in most cases joinTransaction should be set to false as this is not an EXTENDED EntityManager.
+ */
+ public void setSharedEntityManager(boolean sharedEntityManager) {
+ this.sharedEntityManager = sharedEntityManager;
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
@@ -87,6 +100,7 @@ public class JpaComponent extends UriEndpointComponent {
protected Endpoint createEndpoint(String uri, String path, Map<String, Object> options) throws Exception {
JpaEndpoint endpoint = new JpaEndpoint(uri, this);
endpoint.setJoinTransaction(isJoinTransaction());
+ endpoint.setSharedEntityManager(isSharedEntityManager());
// lets interpret the next string as a class
if (ObjectHelper.isNotEmpty(path)) {
http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index 9824f7d..2bc82a2 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+
import javax.persistence.Entity;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
@@ -39,6 +40,7 @@ import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.orm.jpa.SharedEntityManagerCreator;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
@@ -500,7 +502,12 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- this.entityManager = entityManagerFactory.createEntityManager();
+
+ if (getEndpoint().isSharedEntityManager()) {
+ this.entityManager = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
+ } else {
+ this.entityManager = entityManagerFactory.createEntityManager();
+ }
LOG.trace("Created EntityManager {} on {}", entityManager, this);
}
@@ -512,7 +519,10 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
@Override
protected void doShutdown() throws Exception {
super.doShutdown();
- this.entityManager.close();
- LOG.trace("Closed EntityManager {} on {}", entityManager, this);
+
+ if (entityManager != null) {
+ this.entityManager.close();
+ LOG.trace("Closed EntityManager {} on {}", entityManager, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
index 578689f..e725758 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
@@ -38,6 +38,7 @@ import org.apache.camel.util.IntrospectionSupport;
import org.apache.camel.util.ObjectHelper;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalEntityManagerFactoryBean;
+import org.springframework.orm.jpa.SharedEntityManagerCreator;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
@@ -56,6 +57,8 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
private String persistenceUnit = "camel";
@UriParam(defaultValue = "true")
private boolean joinTransaction = true;
+ @UriParam
+ private boolean sharedEntityManager;
@UriParam(label = "consumer", defaultValue = "-1")
private int maximumResults = -1;
@UriParam(label = "consumer", defaultValue = "true")
@@ -314,6 +317,18 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
this.usePassedInEntityManager = usePassedIn;
}
+ public boolean isSharedEntityManager() {
+ return sharedEntityManager;
+ }
+
+ /**
+ * Whether to use Spring's SharedEntityManager for the consumer/producer.
+ * Note in most cases joinTransaction should be set to false as this is not an EXTENDED EntityManager.
+ */
+ public void setSharedEntityManager(boolean sharedEntityManager) {
+ this.sharedEntityManager = sharedEntityManager;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
@@ -340,7 +355,11 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
*/
@Deprecated
protected EntityManager createEntityManager() {
- return getEntityManagerFactory().createEntityManager();
+ if (sharedEntityManager) {
+ return SharedEntityManagerCreator.createSharedEntityManager(getEntityManagerFactory());
+ } else {
+ return getEntityManagerFactory().createEntityManager();
+ }
}
protected TransactionTemplate createTransactionTemplate() {
http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
index 81df51b..44a5913 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
@@ -20,6 +20,7 @@ import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import org.apache.camel.Exchange;
+import org.springframework.orm.jpa.SharedEntityManagerCreator;
/**
* Helper for JPA.
@@ -36,10 +37,11 @@ public final class JpaHelper {
* @param entityManagerFactory the entity manager factory (mandatory)
* @param usePassedInEntityManager whether to use an existing {@link javax.persistence.EntityManager} which has been stored
* on the exchange in the header with key {@link org.apache.camel.component.jpa.JpaConstants#ENTITY_MANAGER}
+ * @param useSharedEntityManager whether to use SharedEntityManagerCreator if not already passed in
* @return the entity manager (is never null)
*/
public static EntityManager getTargetEntityManager(Exchange exchange, EntityManagerFactory entityManagerFactory,
- boolean usePassedInEntityManager) {
+ boolean usePassedInEntityManager, boolean useSharedEntityManager) {
EntityManager em = null;
// favor using entity manager provided as a header from the end user
@@ -52,6 +54,10 @@ public final class JpaHelper {
em = exchange.getProperty(JpaConstants.ENTITY_MANAGER, EntityManager.class);
}
+ if (em == null && useSharedEntityManager) {
+ em = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
+ }
+
if (em == null) {
// create a new entity manager
em = entityManagerFactory.createEntityManager();
http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
index 2fee422..cf96488 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
@@ -54,7 +54,8 @@ public class JpaProducer extends DefaultProducer {
public void process(final Exchange exchange) {
// resolve the entity manager before evaluating the expression
- final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, getEndpoint().isUsePassedInEntityManager());
+ final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory,
+ getEndpoint().isUsePassedInEntityManager(), getEndpoint().isSharedEntityManager());
final Object values = expression.evaluate(exchange, Object.class);
if (values != null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
index e352cff..4af8658 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
@@ -50,6 +50,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
private final EntityManagerFactory entityManagerFactory;
private final TransactionTemplate transactionTemplate;
private boolean joinTransaction = true;
+ private boolean sharedEntityManager;
public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory, String processorName) {
this(entityManagerFactory, createTransactionTemplate(entityManagerFactory), processorName);
@@ -83,7 +84,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
@Override
public boolean add(final Exchange exchange, final String messageId) {
- final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true);
+ final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager);
// Run this in single transaction.
Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
@@ -118,7 +119,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
@Override
public boolean contains(final Exchange exchange, final String messageId) {
- final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true);
+ final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager);
// Run this in single transaction.
Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
@@ -147,7 +148,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
@Override
public boolean remove(final Exchange exchange, final String messageId) {
- final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true);
+ final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager);
Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
public Boolean doInTransaction(TransactionStatus status) {
@@ -203,6 +204,15 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
this.joinTransaction = joinTransaction;
}
+ @ManagedAttribute(description = "Whether to use shared EntityManager")
+ public boolean isSharedEntityManager() {
+ return sharedEntityManager;
+ }
+
+ public void setSharedEntityManager(boolean sharedEntityManager) {
+ this.sharedEntityManager = sharedEntityManager;
+ }
+
@Override
protected void doStart() throws Exception {
// noop
@@ -212,4 +222,5 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId
protected void doStop() throws Exception {
// noop
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
new file mode 100644
index 0000000..c534da1
--- /dev/null
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.examples.SendEmail;
+import org.apache.camel.spring.SpringRouteBuilder;
+import org.junit.Test;
+import org.springframework.context.expression.BeanFactoryResolver;
+import org.springframework.expression.Expression;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
+import org.springframework.orm.jpa.LocalEntityManagerFactoryBean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+/**
+ * @version
+ */
+public class JpaRouteSharedEntityManagerTest extends AbstractJpaTest {
+ protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x";
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ @Test
+ public void testRouteJpaShared() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ int countStart = getBrokerCount();
+ assertThat("brokerCount", countStart, equalTo(1));
+
+ template.sendBody("direct:startShared", new SendEmail("one@somewhere.org"));
+ // start route
+ context.startRoute("jpaShared");
+
+ // not the cleanest way to check the number of open connections
+ int countEnd = getBrokerCount();
+ assertThat("brokerCount", countEnd, equalTo(1));
+
+ latch.countDown();
+
+ assertMockEndpointsSatisfied();
+ }
+
+ private int getBrokerCount() {
+ LocalEntityManagerFactoryBean entityManagerFactory = applicationContext.getBean("&entityManagerFactory", LocalEntityManagerFactoryBean.class);
+
+ //uses Spring EL so we don't need to reference the classes
+ StandardEvaluationContext context = new StandardEvaluationContext(entityManagerFactory);
+ context.setBeanResolver(new BeanFactoryResolver(applicationContext));
+ SpelExpressionParser parser = new SpelExpressionParser();
+ Expression expression = parser.parseExpression("nativeEntityManagerFactory.brokerFactory.openBrokers");
+ List<?> brokers = expression.getValue(context, List.class);
+
+ return brokers.size();
+ }
+
+ @Test
+ public void testRouteJpaNotShared() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:startNotshared", new SendEmail("one@somewhere.org"));
+
+ int countStart = getBrokerCount();
+ assertThat("brokerCount", countStart, equalTo(1));
+
+ // start route
+ context.startRoute("jpaOwn");
+
+ // not the cleanest way to check the number of open connections
+ int countEnd = getBrokerCount();
+ assertThat("brokerCount", countEnd, equalTo(2));
+
+ latch.countDown();
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new SpringRouteBuilder() {
+ public void configure() {
+ from("direct:startNotshared").to("jpa://" + SendEmail.class.getName() + "?");
+ from("direct:startShared").to("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=true&joinTransaction=false");
+ from("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=true&joinTransaction=false").routeId("jpaShared").autoStartup(false).process(new LatchProcessor()).to("mock:result");
+ from("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=false").routeId("jpaOwn").autoStartup(false).process(new LatchProcessor()).to("mock:result");
+ }
+ };
+ }
+
+ @Override
+ protected String routeXml() {
+ return "org/apache/camel/processor/jpa/springJpaRouteTest.xml";
+ }
+
+ @Override
+ protected String selectAllString() {
+ return SELECT_ALL_STRING;
+ }
+
+ private class LatchProcessor implements Processor {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ latch.await(2, TimeUnit.SECONDS);
+ }
+ }
+}
+