You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/04/03 14:31:41 UTC
svn commit: r525148 -
/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
Author: jstrachan
Date: Tue Apr 3 05:31:40 2007
New Revision: 525148
URL: http://svn.apache.org/viewvc?view=rev&rev=525148
Log:
use explicit exclusive lock when processing component
Modified:
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?view=diff&rev=525148&r1=525147&r2=525148
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Apr 3 05:31:40 2007
@@ -24,8 +24,9 @@
import org.apache.commons.logging.LogFactory;
import javax.persistence.EntityManager;
-import javax.persistence.Query;
import javax.persistence.EntityTransaction;
+import javax.persistence.LockModeType;
+import javax.persistence.Query;
import java.util.List;
/**
@@ -33,7 +34,6 @@
*/
public class JpaConsumer extends PollingConsumer<Exchange> {
private static final transient Log log = LogFactory.getLog(JpaConsumer.class);
-
private final JpaEndpoint endpoint;
private final EntityManager entityManager;
private QueryFactory queryFactory;
@@ -62,10 +62,13 @@
if (log.isDebugEnabled()) {
log.debug("Processing new entity: " + result);
}
- // lets turn the result into an exchange and fire it into the processor
- Exchange exchange = createExchange(result);
- getProcessor().onExchange(exchange);
- getDeleteHandler().deleteObject(this, result);
+
+ if (lockEntity(result)) {
+ // lets turn the result into an exchange and fire it into the processor
+ Exchange exchange = createExchange(result);
+ getProcessor().onExchange(exchange);
+ getDeleteHandler().deleteObject(this, result);
+ }
}
transaction.commit();
@@ -121,6 +124,28 @@
}
entityManager.close();
super.doStop();
+ }
+
+ /**
+ * A strategy method to lock an object with an exclusive lock so that it can be processed
+ *
+ * @param entity the entity to be locked
+ * @return true if the entity was locked
+ */
+ protected boolean lockEntity(Object entity) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Acquiring exclusive lock on entity: " + entity);
+ }
+ entityManager.lock(entity, LockModeType.WRITE);
+ return true;
+ }
+ catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to achieve lock on entity: " + entity + ". Reason: " + e, e);
+ }
+ return false;
+ }
}
protected QueryFactory createQueryFactory() {