You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/11/30 14:49:39 UTC
[10/12] camel git commit: CAMEL-10490 Changed to optimistic lock mode
type and added test for concurrent access without lock
CAMEL-10490 Changed to optimistic lock mode type and added test for concurrent access without lock
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9c7db1ab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9c7db1ab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9c7db1ab
Branch: refs/heads/camel-2.18.x
Commit: 9c7db1abaa6f956f7b577dda7ad966bcba44d85f
Parents: 40c6fa3
Author: Bob Gaudaen <bo...@gmail.com>
Authored: Wed Nov 30 13:39:21 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Nov 30 15:48:52 2016 +0100
----------------------------------------------------------------------
.../camel/component/jpa/JpaPollingConsumer.java | 38 +++++---
.../org/apache/camel/examples/Customer.java | 6 +-
.../camel/processor/jpa/AbstractJpaTest.java | 22 ++++-
.../jpa/JpaPollingConsumerLockEntityTest.java | 98 +++++++++++++-------
.../processor/jpa/JpaPollingConsumerTest.java | 17 +---
5 files changed, 116 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
index f787393..8b085ad 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
@@ -29,6 +29,7 @@ import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Query;
import javax.persistence.LockModeType;
+import javax.persistence.PersistenceException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.PollingConsumerSupport;
@@ -144,21 +145,32 @@ public class JpaPollingConsumer extends PollingConsumerSupport {
LOG.trace("Created query {}", query);
Object answer;
- List<?> results = query.getResultList();
- if (results != null && results.size() == 1) {
- // we only have 1 entity so return that
- answer = results.get(0);
- } else {
- // we have more data so return a list
- answer = results;
- }
+ try {
+ List<?> results = query.getResultList();
+
+ if (results != null && results.size() == 1) {
+ // we only have 1 entity so return that
+ answer = results.get(0);
+ } else {
+ // we have more data so return a list
+ answer = results;
+ }
+
+ // commit
+ LOG.debug("Flushing EntityManager");
+ entityManager.flush();
- // commit
- LOG.debug("Flushing EntityManager");
- entityManager.flush();
- // must clear after flush
- entityManager.clear();
+ // must clear after flush
+ entityManager.clear();
+
+ } catch (PersistenceException e) {
+ LOG.info("Disposing EntityManager {} on {} due to coming transaction rollback", entityManager, this);
+
+ entityManager.close();
+
+ throw e;
+ }
return answer;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java b/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java
index 1bf2ec6..370cdf9 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java
@@ -22,6 +22,7 @@ import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.NamedQuery;
import javax.persistence.OneToOne;
+import javax.persistence.Version;
/**
* @version
@@ -39,6 +40,9 @@ public class Customer {
private Address address;
private int orderCount;
+ @Version
+ private Long version;
+
public Long getId() {
return id;
}
@@ -74,7 +78,7 @@ public class Customer {
@Override
public String toString() {
// OpenJPA warns about fields being accessed directly in methods if NOT using the corresponding getters.
- return "Customer[id: " + getId() + ", name: " + getName() + ", address: " + getAddress() + "]";
+ return "Customer[id: " + getId() + ", version: " + version + ", name: " + getName() + ", address: " + getAddress() + "]";
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java
index fd0aac5..09d4af0 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java
@@ -25,6 +25,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.examples.SendEmail;
import org.apache.camel.spring.SpringCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.openjpa.persistence.util.SourceCode;
import org.junit.After;
import org.junit.Before;
import org.springframework.context.ApplicationContext;
@@ -80,10 +81,25 @@ public abstract class AbstractJpaTest extends CamelTestSupport {
}
protected void assertEntityInDB(int size) throws Exception {
- List<?> list = entityManager.createQuery(selectAllString()).getResultList();
- assertEquals(size, list.size());
+ assertEntityInDB(size, SendEmail.class);
+ }
+
+ protected void assertEntityInDB(int size, Class entityType) {
+ List<?> results = entityManager.createQuery("select o from " + entityType.getName() + " o").getResultList();
+ assertEquals(size, results.size());
- assertIsInstanceOf(SendEmail.class, list.get(0));
+ assertIsInstanceOf(entityType, results.get(0));
+ }
+
+ protected void saveEntityInDB(final Object entity) {
+ transactionTemplate.execute(new TransactionCallback<Object>() {
+ public Object doInTransaction(TransactionStatus status) {
+ entityManager.joinTransaction();
+ entityManager.persist(entity);
+ entityManager.flush();
+ return null;
+ }
+ });
}
protected abstract String routeXml();
http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java
index 89c95db..89b3a47 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java
@@ -16,50 +16,42 @@
*/
package org.apache.camel.processor.jpa;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.examples.Customer;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spring.SpringRouteBuilder;
+import org.junit.Before;
import org.junit.Test;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
+import javax.persistence.OptimisticLockException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest {
protected static final String SELECT_ALL_STRING = "select x from " + Customer.class.getName() + " x";
- protected void save(final Customer customer) {
- transactionTemplate.execute(new TransactionCallback<Object>() {
- public Object doInTransaction(TransactionStatus status) {
- entityManager.joinTransaction();
- entityManager.persist(customer);
- entityManager.flush();
- return null;
- }
- });
- }
-
- protected void assertEntitiesInDatabase(int count, String entity) {
- List<?> results = entityManager.createQuery("select o from " + entity + " o").getResultList();
- assertEquals(count, results.size());
- }
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
- @Test
- public void testPollingConsumerHandlesLockedEntity() throws Exception {
Customer customer = new Customer();
customer.setName("Donald Duck");
- save(customer);
+ saveEntityInDB(customer);
Customer customer2 = new Customer();
customer2.setName("Goofy");
- save(customer2);
+ saveEntityInDB(customer2);
- assertEntitiesInDatabase(2, Customer.class.getName());
+ assertEntityInDB(2, Customer.class);
+ }
+
+ @Test
+ public void testPollingConsumerWithLock() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
+ MockEndpoint mock = getMockEndpoint("mock:locked");
mock.expectedBodiesReceived(
"orders: 1",
"orders: 2"
@@ -68,8 +60,27 @@ public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest {
Map<String, Object> headers = new HashMap<>();
headers.put("name", "Donald%");
- template.asyncRequestBodyAndHeaders("direct:start", "message", headers);
- template.asyncRequestBodyAndHeaders("direct:start", "message", headers);
+ template.asyncRequestBodyAndHeaders("direct:locked", "message", headers);
+ template.asyncRequestBodyAndHeaders("direct:locked", "message", headers);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testPollingConsumerWithoutLock() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:not-locked");
+ MockEndpoint errMock = getMockEndpoint("mock:error");
+
+ mock.expectedBodiesReceived("orders: 1");
+
+ errMock.expectedMessageCount(1);
+ errMock.message(0).body().isInstanceOf(OptimisticLockException.class);
+
+ Map<String, Object> headers = new HashMap<>();
+ headers.put("name", "Donald%");
+
+ template.asyncRequestBodyAndHeaders("direct:not-locked", "message", headers);
+ template.asyncRequestBodyAndHeaders("direct:not-locked", "message", headers);
assertMockEndpointsSatisfied();
}
@@ -78,18 +89,39 @@ public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest {
protected RouteBuilder createRouteBuilder() {
return new SpringRouteBuilder() {
public void configure() {
- from("direct:start")
- .transacted()
- .pollEnrich().simple("jpa://" + Customer.class.getName() + "?joinTransaction=true&consumeLockEntity=true&query=select c from Customer c where c.name like '${header.name}'")
- .aggregationStrategy((originalExchange, jpaExchange) -> {
+
+ AggregationStrategy enrichStrategy = new AggregationStrategy() {
+ @Override
+ public Exchange aggregate(Exchange originalExchange, Exchange jpaExchange) {
Customer customer = jpaExchange.getIn().getBody(Customer.class);
customer.setOrderCount(customer.getOrderCount()+1);
return jpaExchange;
- })
- .to("jpa://" + Customer.class.getName() + "?joinTransaction=true&usePassedInEntityManager=true")
+ }
+ };
+
+ onException(Exception.class)
+ .setBody().simple("${exception}")
+ .to("mock:error")
+ .handled(true);
+
+ from("direct:locked")
+ .onException(OptimisticLockException.class)
+ .redeliveryDelay(60)
+ .maximumRedeliveries(2)
+ .end()
+ .pollEnrich().simple("jpa://" + Customer.class.getName() + "?lockModeType=OPTIMISTIC_FORCE_INCREMENT&query=select c from Customer c where c.name like '${header.name}'")
+ .aggregationStrategy(enrichStrategy)
+ .to("jpa://" + Customer.class.getName())
+ .setBody().simple("orders: ${body.orderCount}")
+ .to("mock:locked");
+
+ from("direct:not-locked")
+ .pollEnrich().simple("jpa://" + Customer.class.getName() + "?query=select c from Customer c where c.name like '${header.name}'")
+ .aggregationStrategy(enrichStrategy)
+ .to("jpa://" + Customer.class.getName())
.setBody().simple("orders: ${body.orderCount}")
- .to("mock:result");
+ .to("mock:not-locked");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java
index 5b72211..a161a71 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java
@@ -23,23 +23,10 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.examples.Customer;
import org.apache.camel.spring.SpringRouteBuilder;
import org.junit.Test;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
public class JpaPollingConsumerTest extends AbstractJpaTest {
protected static final String SELECT_ALL_STRING = "select x from " + Customer.class.getName() + " x";
- protected void save(final Customer customer) {
- transactionTemplate.execute(new TransactionCallback<Object>() {
- public Object doInTransaction(TransactionStatus status) {
- entityManager.joinTransaction();
- entityManager.persist(customer);
- entityManager.flush();
- return null;
- }
- });
- }
-
protected void assertEntitiesInDatabase(int count, String entity) {
List<?> results = entityManager.createQuery("select o from " + entity + " o").getResultList();
assertEquals(count, results.size());
@@ -49,10 +36,10 @@ public class JpaPollingConsumerTest extends AbstractJpaTest {
public void testPollingConsumer() throws Exception {
Customer customer = new Customer();
customer.setName("Donald Duck");
- save(customer);
+ saveEntityInDB(customer);
Customer customer2 = new Customer();
customer2.setName("Goofy");
- save(customer2);
+ saveEntityInDB(customer2);
assertEntitiesInDatabase(2, Customer.class.getName());