You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/11/30 14:49:39 UTC

[10/12] camel git commit: CAMEL-10490 Changed to optimistic lock mode type and added test for concurrent access without lock

CAMEL-10490 Changed to optimistic lock mode type and added test for concurrent access without lock


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9c7db1ab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9c7db1ab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9c7db1ab

Branch: refs/heads/camel-2.18.x
Commit: 9c7db1abaa6f956f7b577dda7ad966bcba44d85f
Parents: 40c6fa3
Author: Bob Gaudaen <bo...@gmail.com>
Authored: Wed Nov 30 13:39:21 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Nov 30 15:48:52 2016 +0100

----------------------------------------------------------------------
 .../camel/component/jpa/JpaPollingConsumer.java | 38 +++++---
 .../org/apache/camel/examples/Customer.java     |  6 +-
 .../camel/processor/jpa/AbstractJpaTest.java    | 22 ++++-
 .../jpa/JpaPollingConsumerLockEntityTest.java   | 98 +++++++++++++-------
 .../processor/jpa/JpaPollingConsumerTest.java   | 17 +---
 5 files changed, 116 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
index f787393..8b085ad 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
@@ -29,6 +29,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Query;
 import javax.persistence.LockModeType;
+import javax.persistence.PersistenceException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.PollingConsumerSupport;
@@ -144,21 +145,32 @@ public class JpaPollingConsumer extends PollingConsumerSupport {
                 LOG.trace("Created query {}", query);
 
                 Object answer;
-                List<?> results = query.getResultList();
 
-                if (results != null && results.size() == 1) {
-                    // we only have 1 entity so return that
-                    answer = results.get(0);
-                } else {
-                    // we have more data so return a list
-                    answer = results;
-                }
+                try {
+                    List<?> results = query.getResultList();
+
+                    if (results != null && results.size() == 1) {
+                        // we only have 1 entity so return that
+                        answer = results.get(0);
+                    } else {
+                        // we have more data so return a list
+                        answer = results;
+                    }
+
+                    // commit
+                    LOG.debug("Flushing EntityManager");
+                    entityManager.flush();
 
-                // commit
-                LOG.debug("Flushing EntityManager");
-                entityManager.flush();
-                // must clear after flush
-                entityManager.clear();
+                    // must clear after flush
+                    entityManager.clear();
+
+                } catch (PersistenceException e) {
+                    LOG.info("Disposing EntityManager {} on {} due to coming transaction rollback", entityManager, this);
+
+                    entityManager.close();
+
+                    throw e;
+                }
 
                 return answer;
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java b/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java
index 1bf2ec6..370cdf9 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java
@@ -22,6 +22,7 @@ import javax.persistence.GeneratedValue;
 import javax.persistence.Id;
 import javax.persistence.NamedQuery;
 import javax.persistence.OneToOne;
+import javax.persistence.Version;
 
 /**
  * @version 
@@ -39,6 +40,9 @@ public class Customer {
     private Address address;
     private int orderCount;
 
+    @Version
+    private Long version;
+
     public Long getId() {
         return id;
     }
@@ -74,7 +78,7 @@ public class Customer {
     @Override
     public String toString() {
         // OpenJPA warns about fields being accessed directly in methods if NOT using the corresponding getters.
-        return "Customer[id: " + getId() + ", name: " + getName() + ", address: " + getAddress() + "]";
+        return "Customer[id: " + getId() + ", version: " + version + ", name: " + getName() + ", address: " + getAddress() + "]";
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java
index fd0aac5..09d4af0 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java
@@ -25,6 +25,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.examples.SendEmail;
 import org.apache.camel.spring.SpringCamelContext;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.openjpa.persistence.util.SourceCode;
 import org.junit.After;
 import org.junit.Before;
 import org.springframework.context.ApplicationContext;
@@ -80,10 +81,25 @@ public abstract class AbstractJpaTest extends CamelTestSupport {
     }
 
     protected void assertEntityInDB(int size) throws Exception {
-        List<?> list = entityManager.createQuery(selectAllString()).getResultList();
-        assertEquals(size, list.size());
+        assertEntityInDB(size, SendEmail.class);
+    }
+
+    protected void assertEntityInDB(int size, Class entityType) {
+        List<?> results = entityManager.createQuery("select o from " + entityType.getName() + " o").getResultList();
+        assertEquals(size, results.size());
 
-        assertIsInstanceOf(SendEmail.class, list.get(0));
+        assertIsInstanceOf(entityType, results.get(0));
+    }
+
+    protected void saveEntityInDB(final Object entity) {
+        transactionTemplate.execute(new TransactionCallback<Object>() {
+            public Object doInTransaction(TransactionStatus status) {
+                entityManager.joinTransaction();
+                entityManager.persist(entity);
+                entityManager.flush();
+                return null;
+            }
+        });
     }
     
     protected abstract String routeXml();

http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java
index 89c95db..89b3a47 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java
@@ -16,50 +16,42 @@
  */
 package org.apache.camel.processor.jpa;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.examples.Customer;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spring.SpringRouteBuilder;
+import org.junit.Before;
 import org.junit.Test;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
 
+import javax.persistence.OptimisticLockException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest {
     protected static final String SELECT_ALL_STRING = "select x from " + Customer.class.getName() + " x";
 
-    protected void save(final Customer customer) {
-        transactionTemplate.execute(new TransactionCallback<Object>() {
-            public Object doInTransaction(TransactionStatus status) {
-                entityManager.joinTransaction();
-                entityManager.persist(customer);
-                entityManager.flush();
-                return null;
-            }
-        });
-    }
-
-    protected void assertEntitiesInDatabase(int count, String entity) {
-        List<?> results = entityManager.createQuery("select o from " + entity + " o").getResultList();
-        assertEquals(count, results.size());
-    }
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
 
-    @Test
-    public void testPollingConsumerHandlesLockedEntity() throws Exception {
         Customer customer = new Customer();
         customer.setName("Donald Duck");
-        save(customer);
+        saveEntityInDB(customer);
 
         Customer customer2 = new Customer();
         customer2.setName("Goofy");
-        save(customer2);
+        saveEntityInDB(customer2);
 
-        assertEntitiesInDatabase(2, Customer.class.getName());
+        assertEntityInDB(2, Customer.class);
+    }
+
+    @Test
+    public void testPollingConsumerWithLock() throws Exception {
 
-        MockEndpoint mock = getMockEndpoint("mock:result");
+        MockEndpoint mock = getMockEndpoint("mock:locked");
         mock.expectedBodiesReceived(
             "orders: 1",
             "orders: 2"
@@ -68,8 +60,27 @@ public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest {
         Map<String, Object> headers = new HashMap<>();
         headers.put("name", "Donald%");
 
-        template.asyncRequestBodyAndHeaders("direct:start", "message", headers);
-        template.asyncRequestBodyAndHeaders("direct:start", "message", headers);
+        template.asyncRequestBodyAndHeaders("direct:locked", "message", headers);
+        template.asyncRequestBodyAndHeaders("direct:locked", "message", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testPollingConsumerWithoutLock() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:not-locked");
+        MockEndpoint errMock = getMockEndpoint("mock:error");
+
+        mock.expectedBodiesReceived("orders: 1");
+
+        errMock.expectedMessageCount(1);
+        errMock.message(0).body().isInstanceOf(OptimisticLockException.class);
+
+        Map<String, Object> headers = new HashMap<>();
+        headers.put("name", "Donald%");
+
+        template.asyncRequestBodyAndHeaders("direct:not-locked", "message", headers);
+        template.asyncRequestBodyAndHeaders("direct:not-locked", "message", headers);
 
         assertMockEndpointsSatisfied();
     }
@@ -78,18 +89,39 @@ public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest {
     protected RouteBuilder createRouteBuilder() {
         return new SpringRouteBuilder() {
             public void configure() {
-                from("direct:start")
-                    .transacted()
-                    .pollEnrich().simple("jpa://" + Customer.class.getName() + "?joinTransaction=true&consumeLockEntity=true&query=select c from Customer c where c.name like '${header.name}'")
-                    .aggregationStrategy((originalExchange, jpaExchange) -> {
+
+                AggregationStrategy enrichStrategy = new AggregationStrategy() {
+                    @Override
+                    public Exchange aggregate(Exchange originalExchange, Exchange jpaExchange) {
                         Customer customer = jpaExchange.getIn().getBody(Customer.class);
                         customer.setOrderCount(customer.getOrderCount()+1);
 
                         return jpaExchange;
-                    })
-                    .to("jpa://" + Customer.class.getName() + "?joinTransaction=true&usePassedInEntityManager=true")
+                    }
+                };
+
+                onException(Exception.class)
+                    .setBody().simple("${exception}")
+                    .to("mock:error")
+                    .handled(true);
+
+                from("direct:locked")
+                    .onException(OptimisticLockException.class)
+                        .redeliveryDelay(60)
+                        .maximumRedeliveries(2)
+                    .end()
+                    .pollEnrich().simple("jpa://" + Customer.class.getName() + "?lockModeType=OPTIMISTIC_FORCE_INCREMENT&query=select c from Customer c where c.name like '${header.name}'")
+                    .aggregationStrategy(enrichStrategy)
+                    .to("jpa://" + Customer.class.getName())
+                    .setBody().simple("orders: ${body.orderCount}")
+                    .to("mock:locked");
+
+                from("direct:not-locked")
+                    .pollEnrich().simple("jpa://" + Customer.class.getName() + "?query=select c from Customer c where c.name like '${header.name}'")
+                    .aggregationStrategy(enrichStrategy)
+                    .to("jpa://" + Customer.class.getName())
                     .setBody().simple("orders: ${body.orderCount}")
-                    .to("mock:result");
+                    .to("mock:not-locked");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/9c7db1ab/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java
index 5b72211..a161a71 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java
@@ -23,23 +23,10 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.examples.Customer;
 import org.apache.camel.spring.SpringRouteBuilder;
 import org.junit.Test;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
 
 public class JpaPollingConsumerTest extends AbstractJpaTest {
     protected static final String SELECT_ALL_STRING = "select x from " + Customer.class.getName() + " x";
 
-    protected void save(final Customer customer) {
-        transactionTemplate.execute(new TransactionCallback<Object>() {
-            public Object doInTransaction(TransactionStatus status) {
-                entityManager.joinTransaction();
-                entityManager.persist(customer);
-                entityManager.flush();
-                return null;
-            }
-        });
-    }
-
     protected void assertEntitiesInDatabase(int count, String entity) {
         List<?> results = entityManager.createQuery("select o from " + entity + " o").getResultList();
         assertEquals(count, results.size());
@@ -49,10 +36,10 @@ public class JpaPollingConsumerTest extends AbstractJpaTest {
     public void testPollingConsumer() throws Exception {
         Customer customer = new Customer();
         customer.setName("Donald Duck");
-        save(customer);
+        saveEntityInDB(customer);
         Customer customer2 = new Customer();
         customer2.setName("Goofy");
-        save(customer2);
+        saveEntityInDB(customer2);
 
         assertEntitiesInDatabase(2, Customer.class.getName());