You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/21 12:01:00 UTC
[2/2] camel git commit: CAMEL-9346: camel-sql - Add transacted option
CAMEL-9346: camel-sql - Add transacted option
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/584725f4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/584725f4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/584725f4
Branch: refs/heads/camel-2.16.x
Commit: 584725f481eb137e4e4a6e1cde8f80cc7f232115
Parents: e34882f
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Nov 21 12:00:14 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Nov 21 12:00:42 2015 +0100
----------------------------------------------------------------------
.../camel/component/sql/DefaultSqlEndpoint.java | 15 +++++++++++++++
.../org/apache/camel/component/sql/SqlConsumer.java | 11 +++++++++++
2 files changed, 26 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/584725f4/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
index 2de1d64..0f7d4ce 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
@@ -43,6 +43,9 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
private String dataSourceRef;
@UriParam(description = "Sets the DataSource to use to communicate with the database.")
private DataSource dataSource;
+ @UriParam(label = "consumer", description = "Enables or disables transaction. If enabled then if processing an exchange failed then the consumer"
+ + "break out processing any further exchanges to cause a rollback eager.")
+ private boolean transacted;
@UriParam(label = "producer", description = "Enables or disables batch mode")
private boolean batch;
@UriParam(label = "consumer", description = "Sets the maximum number of messages to poll")
@@ -125,6 +128,18 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
this.jdbcTemplate = jdbcTemplate;
}
+ public boolean isTransacted() {
+ return transacted;
+ }
+
+ /**
+ * Enables or disables transaction. If enabled then if processing an exchange failed then the consumer
+ + break out processing any further exchanges to cause a rollback eager
+ */
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
+
public boolean isBatch() {
return batch;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/584725f4/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index ce8135a..40e0eb9 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -27,6 +27,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.RollbackExchangeException;
import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -207,6 +208,16 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
exchange.setException(e);
}
+ if (getEndpoint().isTransacted() && exchange.isFailed()) {
+ // break out as we are transacted and should rollback
+ Exception cause = exchange.getException();
+ if (cause != null) {
+ throw cause;
+ } else {
+ throw new RollbackExchangeException("Rollback transaction due error processing exchange", exchange);
+ }
+ }
+
// pick the on consume to use
String sql = exchange.isFailed() ? onConsumeFailed : onConsume;
try {