You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/04/03 14:31:41 UTC

svn commit: r525148 - /activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java

Author: jstrachan
Date: Tue Apr  3 05:31:40 2007
New Revision: 525148

URL: http://svn.apache.org/viewvc?view=rev&rev=525148
Log:
use explicit exclusive lock when processing component

Modified:
    activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java

Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?view=diff&rev=525148&r1=525147&r2=525148
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Apr  3 05:31:40 2007
@@ -24,8 +24,9 @@
 import org.apache.commons.logging.LogFactory;
 
 import javax.persistence.EntityManager;
-import javax.persistence.Query;
 import javax.persistence.EntityTransaction;
+import javax.persistence.LockModeType;
+import javax.persistence.Query;
 import java.util.List;
 
 /**
@@ -33,7 +34,6 @@
  */
 public class JpaConsumer extends PollingConsumer<Exchange> {
     private static final transient Log log = LogFactory.getLog(JpaConsumer.class);
-
     private final JpaEndpoint endpoint;
     private final EntityManager entityManager;
     private QueryFactory queryFactory;
@@ -62,10 +62,13 @@
                 if (log.isDebugEnabled()) {
                     log.debug("Processing new entity: " + result);
                 }
-                // lets turn the result into an exchange and fire it into the processor
-                Exchange exchange = createExchange(result);
-                getProcessor().onExchange(exchange);
-                getDeleteHandler().deleteObject(this, result);
+
+                if (lockEntity(result)) {
+                    // lets turn the result into an exchange and fire it into the processor
+                    Exchange exchange = createExchange(result);
+                    getProcessor().onExchange(exchange);
+                    getDeleteHandler().deleteObject(this, result);
+                }
             }
 
             transaction.commit();
@@ -121,6 +124,28 @@
         }
         entityManager.close();
         super.doStop();
+    }
+
+    /**
+     * A strategy method to lock an object with an exclusive lock so that it can be processed
+     *
+     * @param entity the entity to be locked
+     * @return true if the entity was locked
+     */
+    protected boolean lockEntity(Object entity) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Acquiring exclusive lock on entity: " + entity);
+            }
+            entityManager.lock(entity, LockModeType.WRITE);
+            return true;
+        }
+        catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Failed to achieve lock on entity: " + entity + ". Reason: " + e, e);
+            }
+            return false;
+        }
     }
 
     protected QueryFactory createQueryFactory() {