You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/03/28 12:27:27 UTC

svn commit: r642203 - in /activemq/camel/trunk/components/camel-bam/src: main/java/org/apache/camel/bam/processor/ test/java/org/apache/camel/bam/

Author: davsclaus
Date: Fri Mar 28 04:27:18 2008
New Revision: 642203

URL: http://svn.apache.org/viewvc?rev=642203&view=rev
Log:
CAMEL-173
- fixed concurrency issue with creating a new entity. Now using a ReentrantLock from Java.
- no more exceptions in the camel-bam-example demo app as well.
- added some assertions to the unit test

Modified:
    activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java
    activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java
    activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java

Modified: activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java?rev=642203&r1=642202&r2=642203&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java (original)
+++ activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java Fri Mar 28 04:27:18 2008
@@ -19,8 +19,6 @@
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 
-import javax.persistence.EntityExistsException;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -28,8 +26,6 @@
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.springframework.dao.DataIntegrityViolationException;
-import org.springframework.orm.jpa.JpaSystemException;
 import org.springframework.transaction.TransactionStatus;
 import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
@@ -44,10 +40,8 @@
 public abstract class BamProcessorSupport<T> implements Processor {
     private static final transient Log LOG = LogFactory.getLog(BamProcessorSupport.class);
     private Class<T> entityType;
-    private Class primaryKeyType = String.class;
     private Expression<Exchange> correlationKeyExpression;
     private TransactionTemplate transactionTemplate;
-    private int maximumRetries = 30;
 
     protected BamProcessorSupport(TransactionTemplate transactionTemplate,
                                   Expression<Exchange> correlationKeyExpression) {
@@ -78,55 +72,24 @@
     }
 
     public void process(final Exchange exchange) {
-        Object entity = null;
-        for (int i = 0; entity == null && i < maximumRetries; i++) {
-            if (i > 0) {
-                LOG.info("Retry attempt due to duplicate row: " + i);
-            }
-            entity = transactionTemplate.execute(new TransactionCallback() {
-                public Object doInTransaction(TransactionStatus status) {
-                    try {
-                        Object key = getCorrelationKey(exchange);
-
-                        T entity = loadEntity(exchange, key);
-
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Correlation key: " + key + " with entity: " + entity);
-                        }
-                        processEntity(exchange, entity);
-
-                        return entity;
-                    } catch (JpaSystemException e) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Likely exception is due to duplicate row in concurrent setting: " + e,
-                                      e);
-                        }
-                        LOG.info("Attempt to insert duplicate row due to concurrency issue, so retrying: "
-                                 + e);
-                        return retryDueToDuplicate(status);
-                    } catch (DataIntegrityViolationException e) {
-                        Throwable throwable = e.getCause();
-                        if (throwable instanceof EntityExistsException) {
-                            LOG
-                                .info("Attempt to insert duplicate row due to concurrency issue, so retrying: "
-                                      + throwable);
-                            return retryDueToDuplicate(status);
-                        }
-                        return onError(status, throwable);
-                    } catch (Throwable e) {
-                        return onError(status, e);
-                    }
-                }
-            });
-        }
-    }
+        Object entity = transactionTemplate.execute(new TransactionCallback() {
+            public Object doInTransaction(TransactionStatus status) {
+                try {
+                    Object key = getCorrelationKey(exchange);
 
-    public int getMaximumRetries() {
-        return maximumRetries;
-    }
+                    T entity = loadEntity(exchange, key);
 
-    public void setMaximumRetries(int maximumRetries) {
-        this.maximumRetries = maximumRetries;
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Correlation key: " + key + " with entity: " + entity);
+                    }
+                    processEntity(exchange, entity);
+
+                    return entity;
+                } catch (Exception e) {
+                    return onError(status, e);
+                }
+            }
+        });
     }
 
     // Properties
@@ -159,14 +122,10 @@
         return value;
     }
 
-    protected Object retryDueToDuplicate(TransactionStatus status) {
-        status.setRollbackOnly();
-        return null;
-    }
-
     protected Object onError(TransactionStatus status, Throwable e) {
         status.setRollbackOnly();
         LOG.error("Caught: " + e, e);
         throw new RuntimeCamelException(e);
     }
+    
 }

Modified: activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java?rev=642203&r1=642202&r2=642203&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java (original)
+++ activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java Fri Mar 28 04:27:18 2008
@@ -18,6 +18,8 @@
 
 import java.lang.reflect.Method;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -40,6 +42,7 @@
 public class JpaBamProcessorSupport<T> extends BamProcessorSupport<T> {
     private static final transient Log LOG = LogFactory.getLog(JpaBamProcessorSupport.class);
 
+    private static final Lock lock = new ReentrantLock(); // lock used for concurrency issues
     private ActivityRules activityRules;
     private JpaTemplate template;
     private String findByKeyQuery;
@@ -107,23 +110,27 @@
     // Implementatiom methods
     // -----------------------------------------------------------------------
     protected T loadEntity(Exchange exchange, Object key) throws Exception {
-        T entity = findEntityByCorrelationKey(key);
-        if (entity == null) {
-            entity = createEntity(exchange, key);
-            setKeyProperty(entity, key);
-            ProcessDefinition definition = ProcessDefinition
-                .getRefreshedProcessDefinition(template, getActivityRules().getProcessRules()
-                    .getProcessDefinition());
-            setProcessDefinitionProperty(entity, definition);
-            template.persist(entity);
-
-            // Now we must flush to avoid concurrent updates clashing trying to
-            // insert the
-            // same row
-            LOG.debug("About to flush on entity: " + entity + " with key: " + key);
-            template.flush();
+        lock.lock();
+        try {
+            T entity = findEntityByCorrelationKey(key);
+            if (entity == null) {
+                entity = createEntity(exchange, key);
+                setKeyProperty(entity, key);
+                ProcessDefinition definition = ProcessDefinition
+                    .getRefreshedProcessDefinition(template, getActivityRules().getProcessRules()
+                        .getProcessDefinition());
+                setProcessDefinitionProperty(entity, definition);
+                template.persist(entity);
+
+                // Now we must flush to avoid concurrent updates clashing trying to
+                // insert the same row
+                LOG.debug("About to flush on entity: " + entity + " with key: " + key);
+                template.flush();
+            }
+            return entity;
+        } finally {
+            lock.unlock();
         }
-        return entity;
     }
 
     protected T findEntityByCorrelationKey(Object key) {

Modified: activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java?rev=642203&r1=642202&r2=642203&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java (original)
+++ activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java Fri Mar 28 04:27:18 2008
@@ -17,6 +17,7 @@
 package org.apache.camel.bam;
 
 
+import org.apache.camel.bam.model.ActivityState;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spring.SpringTestSupport;
@@ -41,6 +42,12 @@
         template.sendBody("direct:a", "<hello id='123'>world!</hello>");
 
         overdueEndpoint.assertIsSatisfied();
+
+        // it was b that was the problem and thus send to the overdue endpoint
+        ActivityState state = overdueEndpoint.getExchanges().get(0).getIn().getBody(ActivityState.class);
+        assertNotNull(state);
+        assertEquals("123", state.getCorrelationKey());
+        assertEquals("b", state.getActivityDefinition().getName());
     }
 
     protected ClassPathXmlApplicationContext createApplicationContext() {
@@ -83,4 +90,5 @@
     protected int getExpectedRouteCount() {
         return 0;
     }
+
 }