You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/15 16:01:15 UTC
svn commit: r1202225 - in /camel/branches/camel-2.7.x: ./
components/camel-jpa/src/main/java/org/apache/camel/component/jpa/
components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/
Author: davsclaus
Date: Tue Nov 15 15:01:15 2011
New Revision: 1202225
URL: http://svn.apache.org/viewvc?rev=1202225&view=rev
Log:
CAMEL-4683: Added consumer.transcted option to JpaConsumer to control TX behavior.
Added:
camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java
- copied unchanged from r1202222, camel/branches/camel-2.8.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java
Modified:
camel/branches/camel-2.7.x/ (props changed)
camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 15 15:01:15 2011
@@ -1,2 +1,2 @@
-/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766,1199807,1200867,1201638-1201639,1202171
-/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703,1199739,1199804,1200861,1201623,1201637,1202167
+/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766,1199807,1200867,1201638-1201639,1202171,1202222
+/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703,1199739,1199804,1200861,1201623,1201637,1202167,1202215
Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1202225&r1=1202224&r2=1202225&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Nov 15 15:01:15 2011
@@ -54,6 +54,7 @@ public class JpaConsumer extends Schedul
private String nativeQuery;
private Class resultClass;
private int maxMessagesPerPoll;
+ private boolean transacted;
private volatile ShutdownRunningTask shutdownRunningTask;
private volatile int pendingExchanges;
@@ -96,17 +97,29 @@ public class JpaConsumer extends Schedul
answer.add(holder);
}
- int messagePolled;
+ PersistenceException cause = null;
+ int messagePolled = 0;
try {
messagePolled = processBatch(CastUtils.cast(answer));
} catch (Exception e) {
if (e instanceof PersistenceException) {
- throw (PersistenceException) e;
+ cause = (PersistenceException) e;
} else {
- throw new PersistenceException(e);
+ cause = new PersistenceException(e);
}
}
+ if (cause != null) {
+ if (!isTransacted()) {
+ LOG.warn("Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause);
+ entityManager.flush();
+ } else {
+ // rollback all by throwning exception
+ throw cause;
+ }
+ }
+
+ // commit
LOG.debug("Flushing EntityManager");
entityManager.flush();
return messagePolled;
@@ -260,7 +273,22 @@ public class JpaConsumer extends Schedul
public void setResultClass(Class resultClass) {
this.resultClass = resultClass;
- }
+ }
+
+ public boolean isTransacted() {
+ return transacted;
+ }
+
+ /**
+ * Sets whether to run in transacted mode or not.
+ * <p/>
+ * This option is default <tt>false</tt>. When <tt>false</tt> then all the good messages
+ * will commit, and the first failed message will rollback.
+ * However when <tt>true</tt>, then all messages will rollback, if just one message failed.
+ */
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
// Implementation methods
// -------------------------------------------------------------------------
Modified: camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java?rev=1202225&r1=1202224&r2=1202225&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java Tue Nov 15 15:01:15 2011
@@ -75,7 +75,7 @@ public class JpaTXRollbackTest extends C
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("jpa://" + SendEmail.class.getName() + "?delay=2000").routeId("foo").noAutoStartup()
+ from("jpa://" + SendEmail.class.getName() + "?consumer.transacted=true&delay=1000").routeId("foo").noAutoStartup()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {