You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/03/28 12:27:27 UTC
svn commit: r642203 - in /activemq/camel/trunk/components/camel-bam/src:
main/java/org/apache/camel/bam/processor/ test/java/org/apache/camel/bam/
Author: davsclaus
Date: Fri Mar 28 04:27:18 2008
New Revision: 642203
URL: http://svn.apache.org/viewvc?rev=642203&view=rev
Log:
CAMEL-173
- fixed concurrency issue with creating a new entity. Now using a ReentrantLock from Java.
- no more exceptions in the camel-bam-example demo app as well.
- added some assertions to the unit test
Modified:
activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java
activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java
activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java
Modified: activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java?rev=642203&r1=642202&r2=642203&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java (original)
+++ activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/BamProcessorSupport.java Fri Mar 28 04:27:18 2008
@@ -19,8 +19,6 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
-import javax.persistence.EntityExistsException;
-
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -28,8 +26,6 @@
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.springframework.dao.DataIntegrityViolationException;
-import org.springframework.orm.jpa.JpaSystemException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
@@ -44,10 +40,8 @@
public abstract class BamProcessorSupport<T> implements Processor {
private static final transient Log LOG = LogFactory.getLog(BamProcessorSupport.class);
private Class<T> entityType;
- private Class primaryKeyType = String.class;
private Expression<Exchange> correlationKeyExpression;
private TransactionTemplate transactionTemplate;
- private int maximumRetries = 30;
protected BamProcessorSupport(TransactionTemplate transactionTemplate,
Expression<Exchange> correlationKeyExpression) {
@@ -78,55 +72,24 @@
}
public void process(final Exchange exchange) {
- Object entity = null;
- for (int i = 0; entity == null && i < maximumRetries; i++) {
- if (i > 0) {
- LOG.info("Retry attempt due to duplicate row: " + i);
- }
- entity = transactionTemplate.execute(new TransactionCallback() {
- public Object doInTransaction(TransactionStatus status) {
- try {
- Object key = getCorrelationKey(exchange);
-
- T entity = loadEntity(exchange, key);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Correlation key: " + key + " with entity: " + entity);
- }
- processEntity(exchange, entity);
-
- return entity;
- } catch (JpaSystemException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Likely exception is due to duplicate row in concurrent setting: " + e,
- e);
- }
- LOG.info("Attempt to insert duplicate row due to concurrency issue, so retrying: "
- + e);
- return retryDueToDuplicate(status);
- } catch (DataIntegrityViolationException e) {
- Throwable throwable = e.getCause();
- if (throwable instanceof EntityExistsException) {
- LOG
- .info("Attempt to insert duplicate row due to concurrency issue, so retrying: "
- + throwable);
- return retryDueToDuplicate(status);
- }
- return onError(status, throwable);
- } catch (Throwable e) {
- return onError(status, e);
- }
- }
- });
- }
- }
+ Object entity = transactionTemplate.execute(new TransactionCallback() {
+ public Object doInTransaction(TransactionStatus status) {
+ try {
+ Object key = getCorrelationKey(exchange);
- public int getMaximumRetries() {
- return maximumRetries;
- }
+ T entity = loadEntity(exchange, key);
- public void setMaximumRetries(int maximumRetries) {
- this.maximumRetries = maximumRetries;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Correlation key: " + key + " with entity: " + entity);
+ }
+ processEntity(exchange, entity);
+
+ return entity;
+ } catch (Exception e) {
+ return onError(status, e);
+ }
+ }
+ });
}
// Properties
@@ -159,14 +122,10 @@
return value;
}
- protected Object retryDueToDuplicate(TransactionStatus status) {
- status.setRollbackOnly();
- return null;
- }
-
protected Object onError(TransactionStatus status, Throwable e) {
status.setRollbackOnly();
LOG.error("Caught: " + e, e);
throw new RuntimeCamelException(e);
}
+
}
Modified: activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java?rev=642203&r1=642202&r2=642203&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java (original)
+++ activemq/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/processor/JpaBamProcessorSupport.java Fri Mar 28 04:27:18 2008
@@ -18,6 +18,8 @@
import java.lang.reflect.Method;
import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -40,6 +42,7 @@
public class JpaBamProcessorSupport<T> extends BamProcessorSupport<T> {
private static final transient Log LOG = LogFactory.getLog(JpaBamProcessorSupport.class);
+ private static final Lock lock = new ReentrantLock(); // lock used for concurrency issues
private ActivityRules activityRules;
private JpaTemplate template;
private String findByKeyQuery;
@@ -107,23 +110,27 @@
// Implementatiom methods
// -----------------------------------------------------------------------
protected T loadEntity(Exchange exchange, Object key) throws Exception {
- T entity = findEntityByCorrelationKey(key);
- if (entity == null) {
- entity = createEntity(exchange, key);
- setKeyProperty(entity, key);
- ProcessDefinition definition = ProcessDefinition
- .getRefreshedProcessDefinition(template, getActivityRules().getProcessRules()
- .getProcessDefinition());
- setProcessDefinitionProperty(entity, definition);
- template.persist(entity);
-
- // Now we must flush to avoid concurrent updates clashing trying to
- // insert the
- // same row
- LOG.debug("About to flush on entity: " + entity + " with key: " + key);
- template.flush();
+ lock.lock();
+ try {
+ T entity = findEntityByCorrelationKey(key);
+ if (entity == null) {
+ entity = createEntity(exchange, key);
+ setKeyProperty(entity, key);
+ ProcessDefinition definition = ProcessDefinition
+ .getRefreshedProcessDefinition(template, getActivityRules().getProcessRules()
+ .getProcessDefinition());
+ setProcessDefinitionProperty(entity, definition);
+ template.persist(entity);
+
+ // Now we must flush to avoid concurrent updates clashing trying to
+ // insert the same row
+ LOG.debug("About to flush on entity: " + entity + " with key: " + key);
+ template.flush();
+ }
+ return entity;
+ } finally {
+ lock.unlock();
}
- return entity;
}
protected T findEntityByCorrelationKey(Object key) {
Modified: activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java?rev=642203&r1=642202&r2=642203&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java (original)
+++ activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java Fri Mar 28 04:27:18 2008
@@ -17,6 +17,7 @@
package org.apache.camel.bam;
+import org.apache.camel.bam.model.ActivityState;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spring.SpringTestSupport;
@@ -41,6 +42,12 @@
template.sendBody("direct:a", "<hello id='123'>world!</hello>");
overdueEndpoint.assertIsSatisfied();
+
+ // it was b that was the problem and thus send to the overdue endpoint
+ ActivityState state = overdueEndpoint.getExchanges().get(0).getIn().getBody(ActivityState.class);
+ assertNotNull(state);
+ assertEquals("123", state.getCorrelationKey());
+ assertEquals("b", state.getActivityDefinition().getName());
}
protected ClassPathXmlApplicationContext createApplicationContext() {
@@ -83,4 +90,5 @@
protected int getExpectedRouteCount() {
return 0;
}
+
}