You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/15 16:01:15 UTC

svn commit: r1202225 - in /camel/branches/camel-2.7.x: ./ components/camel-jpa/src/main/java/org/apache/camel/component/jpa/ components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/

Author: davsclaus
Date: Tue Nov 15 15:01:15 2011
New Revision: 1202225

URL: http://svn.apache.org/viewvc?rev=1202225&view=rev
Log:
CAMEL-4683: Added consumer.transcted option to JpaConsumer to control TX behavior.

Added:
    camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java
      - copied unchanged from r1202222, camel/branches/camel-2.8.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java
Modified:
    camel/branches/camel-2.7.x/   (props changed)
    camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 15 15:01:15 2011
@@ -1,2 +1,2 @@
-/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766,1199807,1200867,1201638-1201639,1202171
-/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703,1199739,1199804,1200861,1201623,1201637,1202167
+/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766,1199807,1200867,1201638-1201639,1202171,1202222
+/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703,1199739,1199804,1200861,1201623,1201637,1202167,1202215

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1202225&r1=1202224&r2=1202225&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Nov 15 15:01:15 2011
@@ -54,6 +54,7 @@ public class JpaConsumer extends Schedul
     private String nativeQuery;
     private Class resultClass;
     private int maxMessagesPerPoll;
+    private boolean transacted;
     private volatile ShutdownRunningTask shutdownRunningTask;
     private volatile int pendingExchanges;
 
@@ -96,17 +97,29 @@ public class JpaConsumer extends Schedul
                     answer.add(holder);
                 }
 
-                int messagePolled;
+                PersistenceException cause = null;
+                int messagePolled = 0;
                 try {
                     messagePolled = processBatch(CastUtils.cast(answer));
                 } catch (Exception e) {
                     if (e instanceof PersistenceException) {
-                        throw (PersistenceException) e;
+                        cause = (PersistenceException) e;
                     } else {
-                        throw new PersistenceException(e);
+                        cause = new PersistenceException(e);
                     }
                 }
 
+                if (cause != null) {
+                    if (!isTransacted()) {
+                        LOG.warn("Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause);
+                        entityManager.flush();
+                    } else {
+                        // rollback all by throwning exception
+                        throw cause;
+                    }
+                }
+
+                // commit
                 LOG.debug("Flushing EntityManager");
                 entityManager.flush();
                 return messagePolled;
@@ -260,7 +273,22 @@ public class JpaConsumer extends Schedul
 
     public void setResultClass(Class resultClass) {
         this.resultClass = resultClass;
-    }    
+    }
+
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    /**
+     * Sets whether to run in transacted mode or not.
+     * <p/>
+     * This option is default <tt>false</tt>. When <tt>false</tt> then all the good messages
+     * will commit, and the first failed message will rollback.
+     * However when <tt>true</tt>, then all messages will rollback, if just one message failed.
+     */
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
 
     // Implementation methods
     // -------------------------------------------------------------------------

Modified: camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java?rev=1202225&r1=1202224&r2=1202225&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java Tue Nov 15 15:01:15 2011
@@ -75,7 +75,7 @@ public class JpaTXRollbackTest extends C
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("jpa://" + SendEmail.class.getName() + "?delay=2000").routeId("foo").noAutoStartup()
+                from("jpa://" + SendEmail.class.getName() + "?consumer.transacted=true&delay=1000").routeId("foo").noAutoStartup()
                         .process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws Exception {