You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/21 12:01:00 UTC

[2/2] camel git commit: CAMEL-9346: camel-sql - Add transacted option

CAMEL-9346: camel-sql - Add transacted option


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/584725f4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/584725f4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/584725f4

Branch: refs/heads/camel-2.16.x
Commit: 584725f481eb137e4e4a6e1cde8f80cc7f232115
Parents: e34882f
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Nov 21 12:00:14 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Nov 21 12:00:42 2015 +0100

----------------------------------------------------------------------
 .../camel/component/sql/DefaultSqlEndpoint.java      | 15 +++++++++++++++
 .../org/apache/camel/component/sql/SqlConsumer.java  | 11 +++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/584725f4/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
index 2de1d64..0f7d4ce 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
@@ -43,6 +43,9 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
     private String dataSourceRef;
     @UriParam(description = "Sets the DataSource to use to communicate with the database.")
     private DataSource dataSource;
+    @UriParam(label = "consumer", description = "Enables or disables transaction. If enabled then if processing an exchange failed then the consumer"
+            + "break out processing any further exchanges to cause a rollback eager.")
+    private boolean transacted;
     @UriParam(label = "producer", description = "Enables or disables batch mode")
     private boolean batch;
     @UriParam(label = "consumer", description = "Sets the maximum number of messages to poll")
@@ -125,6 +128,18 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
         this.jdbcTemplate = jdbcTemplate;
     }
 
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    /**
+     * Enables or disables transaction. If enabled then if processing an exchange failed then the consumer
+     + break out processing any further exchanges to cause a rollback eager
+     */
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
     public boolean isBatch() {
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/584725f4/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index ce8135a..40e0eb9 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -27,6 +27,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.RollbackExchangeException;
 import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -207,6 +208,16 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
                 exchange.setException(e);
             }
 
+            if (getEndpoint().isTransacted() && exchange.isFailed()) {
+                // break out as we are transacted and should rollback
+                Exception cause = exchange.getException();
+                if (cause != null) {
+                    throw cause;
+                } else {
+                    throw new RollbackExchangeException("Rollback transaction due error processing exchange", exchange);
+                }
+            }
+
             // pick the on consume to use
             String sql = exchange.isFailed() ? onConsumeFailed : onConsume;
             try {