You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2017/04/13 14:52:37 UTC
[5/7] camel git commit: CAMEL-10685: Change Camel CDI transaction
package name
CAMEL-10685: Change Camel CDI transaction package name
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6aecd51c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6aecd51c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6aecd51c
Branch: refs/heads/master
Commit: 6aecd51c33b40b91ea7450f8c20da0803db42870
Parents: 8092e89
Author: Antonin Stefanutti <an...@stefanutti.fr>
Authored: Thu Apr 13 15:10:54 2017 +0200
Committer: Antonin Stefanutti <an...@stefanutti.fr>
Committed: Thu Apr 13 16:52:19 2017 +0200
----------------------------------------------------------------------
.../org/apache/camel/cdi/JtaRouteBuilder.java | 2 +-
.../cdi/jta/JtaTransactionErrorHandler.java | 48 ---
.../jta/JtaTransactionErrorHandlerBuilder.java | 164 --------
.../camel/cdi/jta/JtaTransactionPolicy.java | 136 -------
.../cdi/jta/MandatoryJtaTransactionPolicy.java | 18 -
.../cdi/jta/NestedJtaTransactionPolicy.java | 44 ---
.../cdi/jta/NeverJtaTransactionPolicy.java | 18 -
.../jta/NotSupportedJtaTransactionPolicy.java | 24 --
.../cdi/jta/RequiredJtaTransactionPolicy.java | 15 -
.../jta/RequiresNewJtaTransactionPolicy.java | 24 --
.../cdi/jta/SupportsJtaTransactionPolicy.java | 15 -
.../camel/cdi/jta/TransactedDefinition.java | 18 -
.../camel/cdi/jta/TransactionErrorHandler.java | 370 -------------------
.../jta/TransactionalJtaTransactionPolicy.java | 121 ------
.../transaction/JtaTransactionErrorHandler.java | 48 +++
.../JtaTransactionErrorHandlerBuilder.java | 164 ++++++++
.../cdi/transaction/JtaTransactionPolicy.java | 136 +++++++
.../MandatoryJtaTransactionPolicy.java | 18 +
.../transaction/NestedJtaTransactionPolicy.java | 44 +++
.../transaction/NeverJtaTransactionPolicy.java | 18 +
.../NotSupportedJtaTransactionPolicy.java | 24 ++
.../RequiredJtaTransactionPolicy.java | 15 +
.../RequiresNewJtaTransactionPolicy.java | 24 ++
.../SupportsJtaTransactionPolicy.java | 15 +
.../cdi/transaction/TransactedDefinition.java | 18 +
.../transaction/TransactionErrorHandler.java | 370 +++++++++++++++++++
.../TransactionalJtaTransactionPolicy.java | 121 ++++++
27 files changed, 1016 insertions(+), 1016 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
index 838a8a1..0791954 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
@@ -1,7 +1,7 @@
package org.apache.camel.cdi;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.cdi.jta.JtaTransactionErrorHandlerBuilder;
+import org.apache.camel.cdi.transaction.JtaTransactionErrorHandlerBuilder;
/**
* An extension of the {@link RouteBuilder} to provide some additional helper
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
deleted file mode 100644
index 8a7f0d2..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.LoggingLevel;
-import org.apache.camel.Predicate;
-import org.apache.camel.Processor;
-import org.apache.camel.processor.RedeliveryPolicy;
-import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
-import org.apache.camel.util.CamelLogger;
-
-/**
- * This error handler does redelivering. If the transaction fails it can be
- * retried if configured to do so. In the Spring implementation redelivering is
- * done within the transaction which is not appropriate in JTA since every error
- * breaks the current transaction.
- */
-public class JtaTransactionErrorHandler extends org.apache.camel.processor.RedeliveryErrorHandler {
-
- public JtaTransactionErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
- Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
- ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy,
- Predicate retryWhile, ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel,
- Processor onExceptionOccurredProcessor) {
-
- super(camelContext,
- new TransactionErrorHandler(camelContext,
- output,
- exceptionPolicyStrategy,
- transactionPolicy,
- executorService,
- rollbackLoggingLevel),
- logger,
- redeliveryProcessor,
- redeliveryPolicy,
- null,
- null,
- false,
- false,
- retryWhile,
- executorService,
- null,
- onExceptionOccurredProcessor);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
deleted file mode 100644
index 6977e9d..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.LoggingLevel;
-import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.builder.DefaultErrorHandlerBuilder;
-import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.spi.Policy;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.TransactedPolicy;
-import org.apache.camel.util.CamelLogger;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Builds transactional error handlers. This class is based on
- * {@link org.apache.camel.spring.spi.TransactionErrorHandlerBuilder}.
- */
-public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder {
-
- private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class);
-
- private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED";
-
- public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY = JtaTransactionErrorHandlerBuilder.class.getName()
- + "#rollbackLoggingLevel";
-
- private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN;
-
- private JtaTransactionPolicy transactionPolicy;
-
- private String policyRef;
-
- @Override
- public boolean supportTransacted() {
- return true;
- }
-
- @Override
- public ErrorHandlerBuilder cloneBuilder() {
-
- final JtaTransactionErrorHandlerBuilder answer = new JtaTransactionErrorHandlerBuilder();
- cloneBuilder(answer);
- return answer;
-
- }
-
- @Override
- protected void cloneBuilder(DefaultErrorHandlerBuilder other) {
-
- super.cloneBuilder(other);
- if (other instanceof JtaTransactionErrorHandlerBuilder) {
- final JtaTransactionErrorHandlerBuilder otherTx = (JtaTransactionErrorHandlerBuilder) other;
- transactionPolicy = otherTx.transactionPolicy;
- rollbackLoggingLevel = otherTx.rollbackLoggingLevel;
- }
-
- }
-
- public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception {
-
- // resolve policy reference, if given
- if (transactionPolicy == null) {
-
- if (policyRef != null) {
-
- final TransactedDefinition transactedDefinition = new TransactedDefinition();
- transactedDefinition.setRef(policyRef);
- final Policy policy = transactedDefinition.resolvePolicy(routeContext);
- if (policy != null) {
- if (!(policy instanceof JtaTransactionPolicy)) {
- throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '"
- + policyRef.getClass().getName() + "' but an instance of '"
- + JtaTransactionPolicy.class.getName() + "' is required!");
- }
- transactionPolicy = (JtaTransactionPolicy) policy;
- }
-
- }
-
- }
-
- // try to lookup default policy
- if (transactionPolicy == null) {
-
- LOG.debug(
- "No tranaction policiy configured on TransactionErrorHandlerBuilder. Will try find it in the registry.");
-
- Map<String, TransactedPolicy> mapPolicy = routeContext.lookupByType(TransactedPolicy.class);
- if (mapPolicy != null && mapPolicy.size() == 1) {
- TransactedPolicy policy = mapPolicy.values().iterator().next();
- if (policy != null && policy instanceof JtaTransactionPolicy) {
- transactionPolicy = ((JtaTransactionPolicy) policy);
- }
- }
-
- if (transactionPolicy == null) {
- TransactedPolicy policy = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class);
- if (policy != null && policy instanceof JtaTransactionPolicy) {
- transactionPolicy = ((JtaTransactionPolicy) policy);
- }
- }
-
- if (transactionPolicy != null) {
- LOG.debug("Found TransactionPolicy in registry to use: " + transactionPolicy);
- }
-
- }
-
- ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this);
-
- final CamelContext camelContext = routeContext.getCamelContext();
- final Map<String, String> properties = camelContext.getProperties();
- if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) {
- rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY));
- }
-
- JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext,
- processor,
- getLogger(),
- getOnRedelivery(),
- getRedeliveryPolicy(),
- getExceptionPolicyStrategy(),
- transactionPolicy,
- getRetryWhilePolicy(camelContext),
- getExecutorService(camelContext),
- rollbackLoggingLevel,
- getOnExceptionOccurred());
-
- // configure error handler before we can use it
- configure(routeContext, answer);
- return answer;
-
- }
-
- public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) {
- policyRef = ref;
- return this;
- }
-
- public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) {
- this.transactionPolicy = transactionPolicy;
- return this;
- }
-
- public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) {
- this.rollbackLoggingLevel = rollbackLoggingLevel;
- return this;
- }
-
- protected CamelLogger createLogger() {
- return new CamelLogger(LoggerFactory.getLogger(TransactionErrorHandler.class), LoggingLevel.ERROR);
- }
-
- @Override
- public String toString() {
- return "JtaTransactionErrorHandlerBuilder";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
deleted file mode 100644
index c4c70c2..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import javax.annotation.Resource;
-import javax.transaction.TransactionManager;
-
-import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.builder.ErrorHandlerBuilderRef;
-import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.TransactedPolicy;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Sets a proper error handler. This class is based on
- * {@link org.apache.camel.spring.spi.SpringTransactionPolicy}.
- * <p>
- * This class requires the resource {@link TransactionManager} to be available
- * through JNDI url "java:/TransactionManager"
- */
-public abstract class JtaTransactionPolicy implements TransactedPolicy {
-
- private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionPolicy.class);
-
- public static interface Runnable {
- void run() throws Throwable;
- }
-
- @Resource(lookup = "java:/TransactionManager")
- protected TransactionManager transactionManager;
-
- @Override
- public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) {
- // do not inherit since we create our own
- // (otherwise the default error handler would be used two times
- // because we inherit it on our own but only in case of a
- // non-transactional
- // error handler)
- definition.setInheritErrorHandler(false);
- }
-
- public abstract void run(final Runnable runnable) throws Throwable;
-
- @Override
- public Processor wrap(RouteContext routeContext, Processor processor) {
-
- JtaTransactionErrorHandler answer;
-
- // the goal is to configure the error handler builder on the route as a
- // transacted error handler,
- // either its already a transacted or if not we replace it with a
- // transacted one that we configure here
- // and wrap the processor in the transacted error handler as we can have
- // transacted routes that change
- // propagation behavior, eg: from A required -> B -> requiresNew C
- // (advanced use-case)
- // if we should not support this we do not need to wrap the processor as
- // we only need one transacted error handler
-
- // find the existing error handler builder
- ErrorHandlerBuilder builder = (ErrorHandlerBuilder) routeContext.getRoute().getErrorHandlerBuilder();
-
- // check if its a ref if so then do a lookup
- if (builder instanceof ErrorHandlerBuilderRef) {
- // its a reference to a error handler so lookup the reference
- ErrorHandlerBuilderRef builderRef = (ErrorHandlerBuilderRef) builder;
- String ref = builderRef.getRef();
- // only lookup if there was explicit an error handler builder
- // configured
- // otherwise its just the "default" that has not explicit been
- // configured
- // and if so then we can safely replace that with our transacted
- // error handler
- if (ErrorHandlerBuilderRef.isErrorHandlerBuilderConfigured(ref)) {
- LOG.debug("Looking up ErrorHandlerBuilder with ref: {}", ref);
- builder = (ErrorHandlerBuilder) ErrorHandlerBuilderRef.lookupErrorHandlerBuilder(routeContext, ref);
- }
- }
-
- JtaTransactionErrorHandlerBuilder txBuilder;
- if ((builder != null) && builder.supportTransacted()) {
- if (!(builder instanceof JtaTransactionErrorHandlerBuilder)) {
- throw new RuntimeCamelException("The given transactional error handler builder '" + builder
- + "' is not of type '" + JtaTransactionErrorHandlerBuilder.class.getName()
- + "' which is required in this environment!");
- }
- LOG.debug("The ErrorHandlerBuilder configured is a JtaTransactionErrorHandlerBuilder: {}", builder);
- txBuilder = (JtaTransactionErrorHandlerBuilder) builder.cloneBuilder();
- } else {
- LOG.debug(
- "No or no transactional ErrorHandlerBuilder configured, will use default JtaTransactionErrorHandlerBuilder settings");
- txBuilder = new JtaTransactionErrorHandlerBuilder();
- }
-
- txBuilder.setTransactionPolicy(this);
-
- // use error handlers from the configured builder
- if (builder != null) {
- txBuilder.setErrorHandlers(routeContext, builder.getErrorHandlers(routeContext));
- }
-
- answer = createTransactionErrorHandler(routeContext, processor, txBuilder);
- answer.setExceptionPolicy(txBuilder.getExceptionPolicyStrategy());
- // configure our answer based on the existing error handler
- txBuilder.configure(routeContext, answer);
-
- // set the route to use our transacted error handler builder
- routeContext.getRoute().setErrorHandlerBuilder(txBuilder);
-
- // return with wrapped transacted error handler
- return answer;
-
- }
-
- protected JtaTransactionErrorHandler createTransactionErrorHandler(RouteContext routeContext, Processor processor,
- ErrorHandlerBuilder builder) {
-
- JtaTransactionErrorHandler answer;
- try {
- answer = (JtaTransactionErrorHandler) builder.createErrorHandler(routeContext, processor);
- } catch (Exception e) {
- throw ObjectHelper.wrapRuntimeCamelException(e);
- }
- return answer;
-
- }
-
- @Override
- public String toString() {
- return getClass().getName();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
deleted file mode 100644
index 260ad69..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import javax.inject.Named;
-
-@Named("PROPAGATION_MANDATORY")
-public class MandatoryJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
-
- @Override
- public void run(final Runnable runnable) throws Exception {
-
- if (!hasActiveTransaction()) {
- throw new IllegalStateException(
- "Policy 'PROPAGATION_MANDATORY' is configured but no active transaction was found!");
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
deleted file mode 100644
index 6ce116a..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Named;
-import javax.transaction.Transaction;
-
-@Named("PROPAGATION_NESTED")
-public class NestedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
-
- private static final Logger logger = Logger.getLogger(NestedJtaTransactionPolicy.class.getCanonicalName());
-
- @Override
- public void run(final Runnable runnable) throws Throwable {
-
- Transaction suspendedTransaction = null;
- boolean rollback = false;
- try {
-
- suspendedTransaction = suspendTransaction();
- runWithTransaction(runnable, true);
-
- } catch (Throwable e) {
- rollback = true;
- throw e;
- } finally {
- try {
- if (rollback) {
- rollback(false);
- }
- } catch (Exception e) {
- logger.log(Level.WARNING, "Could not do rollback of outer transaction", e);
- }
- try {
- resumeTransaction(suspendedTransaction);
- } catch (Exception e) {
- logger.log(Level.WARNING, "Could not resume outer transaction", e);
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
deleted file mode 100644
index 377c856..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import javax.inject.Named;
-
-@Named("PROPAGATION_NEVER")
-public class NeverJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
-
- @Override
- public void run(final Runnable runnable) throws Exception {
-
- if (hasActiveTransaction()) {
- throw new IllegalStateException(
- "Policy 'PROPAGATION_NEVER' is configured but an active transaction was found!");
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
deleted file mode 100644
index c3c6bfc..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import javax.inject.Named;
-import javax.transaction.Transaction;
-
-@Named("PROPAGATION_NOT_SUPPORTED")
-public class NotSupportedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
-
- @Override
- public void run(final Runnable runnable) throws Throwable {
-
- Transaction suspendedTransaction = null;
- try {
-
- suspendedTransaction = suspendTransaction();
- runnable.run();
-
- } finally {
- resumeTransaction(suspendedTransaction);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
deleted file mode 100644
index b40dd80..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import javax.inject.Named;
-
-@Named("PROPAGATION_REQUIRED")
-public class RequiredJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
-
- @Override
- public void run(final Runnable runnable) throws Throwable {
-
- runWithTransaction(runnable, !hasActiveTransaction());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
deleted file mode 100644
index 4b1fa47..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import javax.inject.Named;
-import javax.transaction.Transaction;
-
-@Named("PROPAGATION_REQUIRES_NEW")
-public class RequiresNewJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
-
- @Override
- public void run(final Runnable runnable) throws Throwable {
-
- Transaction suspendedTransaction = null;
- try {
-
- suspendedTransaction = suspendTransaction();
- runWithTransaction(runnable, true);
-
- } finally {
- resumeTransaction(suspendedTransaction);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
deleted file mode 100644
index 28ba016..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import javax.inject.Named;
-
-@Named("PROPAGATION_SUPPORTS")
-public class SupportsJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
-
- @Override
- public void run(final Runnable runnable) throws Throwable {
-
- runnable.run();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
deleted file mode 100644
index 9d01cbe..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import org.apache.camel.spi.Policy;
-import org.apache.camel.spi.RouteContext;
-
-/**
- * Used to expose the method 'resolvePolicy' used by
- * {@link JtaTransactionErrorHandlerBuilder} to resolve configured policy
- * references.
- */
-public class TransactedDefinition extends org.apache.camel.model.TransactedDefinition {
-
- @Override
- public Policy resolvePolicy(RouteContext routeContext) {
- return super.resolvePolicy(routeContext);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
deleted file mode 100644
index 651074e..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
+++ /dev/null
@@ -1,370 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.transaction.TransactionRolledbackException;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.LoggingLevel;
-import org.apache.camel.Navigate;
-import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.processor.ErrorHandlerSupport;
-import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
-import org.apache.camel.spi.ShutdownPrepared;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
-
-/**
- * Does transactional execution according given policy. This class is based on
- * {@link org.apache.camel.spring.spi.TransactionErrorHandler} excluding
- * redelivery functionality. In the Spring implementation redelivering is done
- * within the transaction which is not appropriate in JTA since every error
- * breaks the current transaction.
- */
-public class TransactionErrorHandler extends ErrorHandlerSupport
- implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
-
- protected final Processor output;
- protected volatile boolean preparingShutdown;
-
- private ExceptionPolicyStrategy exceptionPolicy;
-
- private JtaTransactionPolicy transactionPolicy;
-
- private final String transactionKey;
-
- private final LoggingLevel rollbackLoggingLevel;
-
- /**
- * Creates the transaction error handler.
- *
- * @param camelContext
- * the camel context
- * @param output
- * outer processor that should use this default error handler
- * @param exceptionPolicyStrategy
- * strategy for onException handling
- * @param transactionPolicy
- * the transaction policy
- * @param executorService
- * the {@link java.util.concurrent.ScheduledExecutorService} to
- * be used for redelivery thread pool. Can be <tt>null</tt>.
- * @param rollbackLoggingLevel
- * logging level to use for logging transaction rollback occurred
- */
- public TransactionErrorHandler(CamelContext camelContext, Processor output,
- ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy,
- ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel) {
-
- this.output = output;
- this.transactionPolicy = transactionPolicy;
- this.rollbackLoggingLevel = rollbackLoggingLevel;
- this.transactionKey = ObjectHelper.getIdentityHashCode(transactionPolicy);
-
- setExceptionPolicy(exceptionPolicyStrategy);
-
- }
-
- public void process(Exchange exchange) throws Exception {
-
- // we have to run this synchronously as a JTA Transaction does *not*
- // support using multiple threads to span a transaction
- if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
- // already transacted by this transaction template
- // so lets just let the error handler process it
- processByErrorHandler(exchange);
- } else {
- // not yet wrapped in transaction so lets do that
- // and then have it invoke the error handler from within that
- // transaction
- processInTransaction(exchange);
- }
-
- }
-
- public boolean process(Exchange exchange, AsyncCallback callback) {
-
- // invoke this synchronous method as JTA Transaction does *not*
- // support using multiple threads to span a transaction
- try {
- process(exchange);
- } catch (Throwable e) {
- exchange.setException(e);
- }
-
- // notify callback we are done synchronously
- callback.done(true);
- return true;
-
- }
-
- protected void processInTransaction(final Exchange exchange) throws Exception {
- // is the exchange redelivered, for example JMS brokers support such
- // details
- Boolean externalRedelivered = exchange.isExternalRedelivered();
- final String redelivered = externalRedelivered != null ? externalRedelivered.toString() : "unknown";
- final String ids = ExchangeHelper.logIds(exchange);
-
- try {
- // mark the beginning of this transaction boundary
- exchange.getUnitOfWork().beginTransactedBy(transactionKey);
-
- // do in transaction
- logTransactionBegin(redelivered, ids);
- doInTransactionTemplate(exchange);
- logTransactionCommit(redelivered, ids);
-
- } catch (TransactionRolledbackException e) {
- // do not set as exception, as its just a dummy exception to force
- // spring TX to rollback
- logTransactionRollback(redelivered, ids, null, true);
- } catch (Throwable e) {
- exchange.setException(e);
- logTransactionRollback(redelivered, ids, e, false);
- } finally {
- // mark the end of this transaction boundary
- exchange.getUnitOfWork().endTransactedBy(transactionKey);
- }
-
- // if it was a local rollback only then remove its marker so outer
- // transaction wont see the marker
- Boolean onlyLast = (Boolean) exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST);
- if (onlyLast != null && onlyLast) {
- // we only want this logged at debug level
- if (log.isDebugEnabled()) {
- // log exception if there was a cause exception so we have the
- // stack trace
- Exception cause = exchange.getException();
- if (cause != null) {
- log.debug("Transaction rollback (" + transactionKey + ") redelivered(" + redelivered + ") for "
- + ids + " due exchange was marked for rollbackOnlyLast and caught: ", cause);
- } else {
- log.debug(
- "Transaction rollback ({}) redelivered({}) for {} "
- + "due exchange was marked for rollbackOnlyLast",
- new Object[] { transactionKey, redelivered, ids });
- }
- }
- // remove caused exception due we was marked as rollback only last
- // so by removing the exception, any outer transaction will not be
- // affected
- exchange.setException(null);
- }
- }
-
- public void setTransactionPolicy(JtaTransactionPolicy transactionPolicy) {
- this.transactionPolicy = transactionPolicy;
- }
-
- protected void doInTransactionTemplate(final Exchange exchange) throws Throwable {
-
- // spring transaction template is working best with rollback if you
- // throw it a runtime exception
- // otherwise it may not rollback messages send to JMS queues etc.
- transactionPolicy.run(new JtaTransactionPolicy.Runnable() {
-
- @Override
- public void run() throws Throwable {
-
- // wrapper exception to throw if the exchange failed
- // IMPORTANT: Must be a runtime exception to let Spring regard
- // it as to do "rollback"
- Throwable rce;
-
- // and now let process the exchange by the error handler
- processByErrorHandler(exchange);
-
- // after handling and still an exception or marked as rollback
- // only then rollback
- if (exchange.getException() != null || exchange.isRollbackOnly()) {
-
- // wrap exception in transacted exception
- if (exchange.getException() != null) {
- rce = exchange.getException();
- } else {
- // create dummy exception to force spring transaction
- // manager to rollback
- rce = new TransactionRolledbackException();
- }
-
- // throw runtime exception to force rollback (which works
- // best to rollback with Spring transaction manager)
- if (log.isTraceEnabled()) {
- log.trace("Throwing runtime exception to force transaction to rollback on {}",
- transactionPolicy);
- }
- throw rce;
- }
- }
-
- });
-
- }
-
- /**
- * Processes the {@link Exchange} using the error handler.
- * <p/>
- * This implementation will invoke ensure this occurs synchronously, that
- * means if the async routing engine did kick in, then this implementation
- * will wait for the task to complete before it continues.
- *
- * @param exchange
- * the exchange
- */
- protected void processByErrorHandler(final Exchange exchange) {
-
- try {
- output.process(exchange);
- } catch (Throwable e) {
- throw new RuntimeCamelException(e);
- }
-
- }
-
- /**
- * Logs the transaction begin
- */
- private void logTransactionBegin(String redelivered, String ids) {
- if (log.isDebugEnabled()) {
- log.debug("Transaction begin ({}) redelivered({}) for {})",
- new Object[] { transactionKey, redelivered, ids });
- }
- }
-
- /**
- * Logs the transaction commit
- */
- private void logTransactionCommit(String redelivered, String ids) {
- if ("true".equals(redelivered)) {
- // okay its a redelivered message so log at INFO level if
- // rollbackLoggingLevel is INFO or higher
- // this allows people to know that the redelivered message was
- // committed this time
- if (rollbackLoggingLevel == LoggingLevel.INFO || rollbackLoggingLevel == LoggingLevel.WARN
- || rollbackLoggingLevel == LoggingLevel.ERROR) {
- log.info("Transaction commit ({}) redelivered({}) for {})",
- new Object[] { transactionKey, redelivered, ids });
- // return after we have logged
- return;
- }
- }
-
- // log non redelivered by default at DEBUG level
- log.debug("Transaction commit ({}) redelivered({}) for {})", new Object[] { transactionKey, redelivered, ids });
- }
-
- /**
- * Logs the transaction rollback.
- */
- private void logTransactionRollback(String redelivered, String ids, Throwable e, boolean rollbackOnly) {
- if (rollbackLoggingLevel == LoggingLevel.OFF) {
- return;
- } else if (rollbackLoggingLevel == LoggingLevel.ERROR && log.isErrorEnabled()) {
- if (rollbackOnly) {
- log.error("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
- new Object[] { transactionKey, redelivered, ids });
- } else {
- log.error("Transaction rollback ({}) redelivered({}) for {} caught: {}",
- new Object[] { transactionKey, redelivered, ids, e.getMessage() });
- }
- } else if (rollbackLoggingLevel == LoggingLevel.WARN && log.isWarnEnabled()) {
- if (rollbackOnly) {
- log.warn("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
- new Object[] { transactionKey, redelivered, ids });
- } else {
- log.warn("Transaction rollback ({}) redelivered({}) for {} caught: {}",
- new Object[] { transactionKey, redelivered, ids, e.getMessage() });
- }
- } else if (rollbackLoggingLevel == LoggingLevel.INFO && log.isInfoEnabled()) {
- if (rollbackOnly) {
- log.info("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
- new Object[] { transactionKey, redelivered, ids });
- } else {
- log.info("Transaction rollback ({}) redelivered({}) for {} caught: {}",
- new Object[] { transactionKey, redelivered, ids, e.getMessage() });
- }
- } else if (rollbackLoggingLevel == LoggingLevel.DEBUG && log.isDebugEnabled()) {
- if (rollbackOnly) {
- log.debug("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
- new Object[] { transactionKey, redelivered, ids });
- } else {
- log.debug("Transaction rollback ({}) redelivered({}) for {} caught: {}",
- new Object[] { transactionKey, redelivered, ids, e.getMessage() });
- }
- } else if (rollbackLoggingLevel == LoggingLevel.TRACE && log.isTraceEnabled()) {
- if (rollbackOnly) {
- log.trace("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
- new Object[] { transactionKey, redelivered, ids });
- } else {
- log.trace("Transaction rollback ({}) redelivered({}) for {} caught: {}",
- new Object[] { transactionKey, redelivered, ids, e.getMessage() });
- }
- }
- }
-
- public void setExceptionPolicy(ExceptionPolicyStrategy exceptionPolicy) {
- this.exceptionPolicy = exceptionPolicy;
- }
-
- public ExceptionPolicyStrategy getExceptionPolicy() {
- return exceptionPolicy;
- }
-
- @Override
- public Processor getOutput() {
- return output;
- }
-
- @Override
- protected void doStart() throws Exception {
- ServiceHelper.startServices(output);
- preparingShutdown = false;
- }
-
- @Override
- protected void doStop() throws Exception {
- // noop, do not stop any services which we only do when shutting down
- // as the error handler can be context scoped, and should not stop in
- // case
- // a route stops
- }
-
- @Override
- protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(output);
- }
-
- @Override
- public boolean supportTransacted() {
- return true;
- }
-
- public boolean hasNext() {
- return output != null;
- }
-
- @Override
- public List<Processor> next() {
- if (!hasNext()) {
- return null;
- }
- List<Processor> answer = new ArrayList<Processor>(1);
- answer.add(output);
- return answer;
- }
-
- @Override
- public void prepareShutdown(boolean suspendOnly, boolean forced) {
- // prepare for shutdown, eg do not allow redelivery if configured
- log.trace("Prepare shutdown on error handler {}", this);
- preparingShutdown = true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
deleted file mode 100644
index 4d17f8a..0000000
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
+++ /dev/null
@@ -1,121 +0,0 @@
-package org.apache.camel.cdi.jta;
-
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-
-import org.apache.camel.CamelException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Helper methods for transaction handling
- */
-public abstract class TransactionalJtaTransactionPolicy extends JtaTransactionPolicy {
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionalJtaTransactionPolicy.class);
-
- protected void runWithTransaction(final Runnable runnable, final boolean isNew) throws Throwable {
-
- if (isNew) {
- begin();
- }
- try {
- runnable.run();
- } catch (RuntimeException e) {
- rollback(isNew);
- throw e;
- } catch (Error e) {
- rollback(isNew);
- throw e;
- } catch (Throwable e) {
- rollback(isNew);
- throw e;
- }
- if (isNew) {
- commit();
- }
- return;
-
- }
-
- private void begin() throws Exception {
-
- transactionManager.begin();
-
- }
-
- private void commit() throws Exception {
-
- try {
- transactionManager.commit();
- } catch (HeuristicMixedException e) {
- throw new CamelException("Unable to commit transaction", e);
- } catch (HeuristicRollbackException e) {
- throw new CamelException("Unable to commit transaction", e);
- } catch (RollbackException e) {
- throw new CamelException("Unable to commit transaction", e);
- } catch (SystemException e) {
- throw new CamelException("Unable to commit transaction", e);
- } catch (RuntimeException e) {
- rollback(true);
- throw e;
- } catch (Exception e) {
- rollback(true);
- throw e;
- } catch (Error e) {
- rollback(true);
- throw e;
- }
-
- }
-
- protected void rollback(boolean isNew) throws Exception {
-
- try {
-
- if (isNew) {
- transactionManager.rollback();
- } else {
- transactionManager.setRollbackOnly();
- }
-
- } catch (Throwable e) {
-
- LOG.warn("Could not rollback transaction!", e);
-
- }
-
- }
-
- protected Transaction suspendTransaction() throws Exception {
-
- return transactionManager.suspend();
-
- }
-
- protected void resumeTransaction(final Transaction suspendedTransaction) {
-
- if (suspendedTransaction == null) {
- return;
- }
-
- try {
- transactionManager.resume(suspendedTransaction);
- } catch (Throwable e) {
- LOG.warn("Could not resume transaction!", e);
- }
-
- }
-
- protected boolean hasActiveTransaction() throws Exception {
-
- return transactionManager.getStatus() != Status.STATUS_MARKED_ROLLBACK
- && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandler.java
new file mode 100644
index 0000000..867d23b
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandler.java
@@ -0,0 +1,48 @@
+package org.apache.camel.cdi.transaction;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.util.CamelLogger;
+
+/**
+ * This error handler does redelivering. If the transaction fails it can be
+ * retried if configured to do so. In the Spring implementation redelivering is
+ * done within the transaction which is not appropriate in JTA since every error
+ * breaks the current transaction.
+ */
+public class JtaTransactionErrorHandler extends org.apache.camel.processor.RedeliveryErrorHandler {
+
+ public JtaTransactionErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
+ Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
+ ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy,
+ Predicate retryWhile, ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel,
+ Processor onExceptionOccurredProcessor) {
+
+ super(camelContext,
+ new TransactionErrorHandler(camelContext,
+ output,
+ exceptionPolicyStrategy,
+ transactionPolicy,
+ executorService,
+ rollbackLoggingLevel),
+ logger,
+ redeliveryProcessor,
+ redeliveryPolicy,
+ null,
+ null,
+ false,
+ false,
+ retryWhile,
+ executorService,
+ null,
+ onExceptionOccurredProcessor);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java
new file mode 100644
index 0000000..2020763
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java
@@ -0,0 +1,164 @@
+package org.apache.camel.cdi.transaction;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.DefaultErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.TransactedPolicy;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Builds transactional error handlers. This class is based on
+ * {@link org.apache.camel.spring.spi.TransactionErrorHandlerBuilder}.
+ */
+public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class);
+
+ private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED";
+
+ public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY = JtaTransactionErrorHandlerBuilder.class.getName()
+ + "#rollbackLoggingLevel";
+
+ private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN;
+
+ private JtaTransactionPolicy transactionPolicy;
+
+ private String policyRef;
+
+ @Override
+ public boolean supportTransacted() {
+ return true;
+ }
+
+ @Override
+ public ErrorHandlerBuilder cloneBuilder() {
+
+ final JtaTransactionErrorHandlerBuilder answer = new JtaTransactionErrorHandlerBuilder();
+ cloneBuilder(answer);
+ return answer;
+
+ }
+
+ @Override
+ protected void cloneBuilder(DefaultErrorHandlerBuilder other) {
+
+ super.cloneBuilder(other);
+ if (other instanceof JtaTransactionErrorHandlerBuilder) {
+ final JtaTransactionErrorHandlerBuilder otherTx = (JtaTransactionErrorHandlerBuilder) other;
+ transactionPolicy = otherTx.transactionPolicy;
+ rollbackLoggingLevel = otherTx.rollbackLoggingLevel;
+ }
+
+ }
+
+ public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception {
+
+ // resolve policy reference, if given
+ if (transactionPolicy == null) {
+
+ if (policyRef != null) {
+
+ final TransactedDefinition transactedDefinition = new TransactedDefinition();
+ transactedDefinition.setRef(policyRef);
+ final Policy policy = transactedDefinition.resolvePolicy(routeContext);
+ if (policy != null) {
+ if (!(policy instanceof JtaTransactionPolicy)) {
+ throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '"
+ + policyRef.getClass().getName() + "' but an instance of '"
+ + JtaTransactionPolicy.class.getName() + "' is required!");
+ }
+ transactionPolicy = (JtaTransactionPolicy) policy;
+ }
+
+ }
+
+ }
+
+ // try to lookup default policy
+ if (transactionPolicy == null) {
+
+ LOG.debug(
+ "No tranaction policiy configured on TransactionErrorHandlerBuilder. Will try find it in the registry.");
+
+ Map<String, TransactedPolicy> mapPolicy = routeContext.lookupByType(TransactedPolicy.class);
+ if (mapPolicy != null && mapPolicy.size() == 1) {
+ TransactedPolicy policy = mapPolicy.values().iterator().next();
+ if (policy != null && policy instanceof JtaTransactionPolicy) {
+ transactionPolicy = ((JtaTransactionPolicy) policy);
+ }
+ }
+
+ if (transactionPolicy == null) {
+ TransactedPolicy policy = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class);
+ if (policy != null && policy instanceof JtaTransactionPolicy) {
+ transactionPolicy = ((JtaTransactionPolicy) policy);
+ }
+ }
+
+ if (transactionPolicy != null) {
+ LOG.debug("Found TransactionPolicy in registry to use: " + transactionPolicy);
+ }
+
+ }
+
+ ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this);
+
+ final CamelContext camelContext = routeContext.getCamelContext();
+ final Map<String, String> properties = camelContext.getProperties();
+ if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) {
+ rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY));
+ }
+
+ JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext,
+ processor,
+ getLogger(),
+ getOnRedelivery(),
+ getRedeliveryPolicy(),
+ getExceptionPolicyStrategy(),
+ transactionPolicy,
+ getRetryWhilePolicy(camelContext),
+ getExecutorService(camelContext),
+ rollbackLoggingLevel,
+ getOnExceptionOccurred());
+
+ // configure error handler before we can use it
+ configure(routeContext, answer);
+ return answer;
+
+ }
+
+ public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) {
+ policyRef = ref;
+ return this;
+ }
+
+ public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) {
+ this.transactionPolicy = transactionPolicy;
+ return this;
+ }
+
+ public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) {
+ this.rollbackLoggingLevel = rollbackLoggingLevel;
+ return this;
+ }
+
+ protected CamelLogger createLogger() {
+ return new CamelLogger(LoggerFactory.getLogger(TransactionErrorHandler.class), LoggingLevel.ERROR);
+ }
+
+ @Override
+ public String toString() {
+ return "JtaTransactionErrorHandlerBuilder";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java
new file mode 100644
index 0000000..ff2a8ab
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java
@@ -0,0 +1,136 @@
+package org.apache.camel.cdi.transaction;
+
+import javax.annotation.Resource;
+import javax.transaction.TransactionManager;
+
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilderRef;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.TransactedPolicy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sets a proper error handler. This class is based on
+ * {@link org.apache.camel.spring.spi.SpringTransactionPolicy}.
+ * <p>
+ * This class requires the resource {@link TransactionManager} to be available
+ * through JNDI url "java:/TransactionManager"
+ */
+public abstract class JtaTransactionPolicy implements TransactedPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionPolicy.class);
+
+ public static interface Runnable {
+ void run() throws Throwable;
+ }
+
+ @Resource(lookup = "java:/TransactionManager")
+ protected TransactionManager transactionManager;
+
+ @Override
+ public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) {
+ // do not inherit since we create our own
+ // (otherwise the default error handler would be used two times
+ // because we inherit it on our own but only in case of a
+ // non-transactional
+ // error handler)
+ definition.setInheritErrorHandler(false);
+ }
+
+ public abstract void run(final Runnable runnable) throws Throwable;
+
+ @Override
+ public Processor wrap(RouteContext routeContext, Processor processor) {
+
+ JtaTransactionErrorHandler answer;
+
+ // the goal is to configure the error handler builder on the route as a
+ // transacted error handler,
+ // either its already a transacted or if not we replace it with a
+ // transacted one that we configure here
+ // and wrap the processor in the transacted error handler as we can have
+ // transacted routes that change
+ // propagation behavior, eg: from A required -> B -> requiresNew C
+ // (advanced use-case)
+ // if we should not support this we do not need to wrap the processor as
+ // we only need one transacted error handler
+
+ // find the existing error handler builder
+ ErrorHandlerBuilder builder = (ErrorHandlerBuilder) routeContext.getRoute().getErrorHandlerBuilder();
+
+ // check if its a ref if so then do a lookup
+ if (builder instanceof ErrorHandlerBuilderRef) {
+ // its a reference to a error handler so lookup the reference
+ ErrorHandlerBuilderRef builderRef = (ErrorHandlerBuilderRef) builder;
+ String ref = builderRef.getRef();
+ // only lookup if there was explicit an error handler builder
+ // configured
+ // otherwise its just the "default" that has not explicit been
+ // configured
+ // and if so then we can safely replace that with our transacted
+ // error handler
+ if (ErrorHandlerBuilderRef.isErrorHandlerBuilderConfigured(ref)) {
+ LOG.debug("Looking up ErrorHandlerBuilder with ref: {}", ref);
+ builder = (ErrorHandlerBuilder) ErrorHandlerBuilderRef.lookupErrorHandlerBuilder(routeContext, ref);
+ }
+ }
+
+ JtaTransactionErrorHandlerBuilder txBuilder;
+ if ((builder != null) && builder.supportTransacted()) {
+ if (!(builder instanceof JtaTransactionErrorHandlerBuilder)) {
+ throw new RuntimeCamelException("The given transactional error handler builder '" + builder
+ + "' is not of type '" + JtaTransactionErrorHandlerBuilder.class.getName()
+ + "' which is required in this environment!");
+ }
+ LOG.debug("The ErrorHandlerBuilder configured is a JtaTransactionErrorHandlerBuilder: {}", builder);
+ txBuilder = (JtaTransactionErrorHandlerBuilder) builder.cloneBuilder();
+ } else {
+ LOG.debug(
+ "No or no transactional ErrorHandlerBuilder configured, will use default JtaTransactionErrorHandlerBuilder settings");
+ txBuilder = new JtaTransactionErrorHandlerBuilder();
+ }
+
+ txBuilder.setTransactionPolicy(this);
+
+ // use error handlers from the configured builder
+ if (builder != null) {
+ txBuilder.setErrorHandlers(routeContext, builder.getErrorHandlers(routeContext));
+ }
+
+ answer = createTransactionErrorHandler(routeContext, processor, txBuilder);
+ answer.setExceptionPolicy(txBuilder.getExceptionPolicyStrategy());
+ // configure our answer based on the existing error handler
+ txBuilder.configure(routeContext, answer);
+
+ // set the route to use our transacted error handler builder
+ routeContext.getRoute().setErrorHandlerBuilder(txBuilder);
+
+ // return with wrapped transacted error handler
+ return answer;
+
+ }
+
+ protected JtaTransactionErrorHandler createTransactionErrorHandler(RouteContext routeContext, Processor processor,
+ ErrorHandlerBuilder builder) {
+
+ JtaTransactionErrorHandler answer;
+ try {
+ answer = (JtaTransactionErrorHandler) builder.createErrorHandler(routeContext, processor);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ return answer;
+
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/MandatoryJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/MandatoryJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/MandatoryJtaTransactionPolicy.java
new file mode 100644
index 0000000..c1f4224
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/MandatoryJtaTransactionPolicy.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.transaction;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_MANDATORY")
+public class MandatoryJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Exception {
+
+ if (!hasActiveTransaction()) {
+ throw new IllegalStateException(
+ "Policy 'PROPAGATION_MANDATORY' is configured but no active transaction was found!");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NestedJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NestedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NestedJtaTransactionPolicy.java
new file mode 100644
index 0000000..72be5ed
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NestedJtaTransactionPolicy.java
@@ -0,0 +1,44 @@
+package org.apache.camel.cdi.transaction;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_NESTED")
+public class NestedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ private static final Logger logger = Logger.getLogger(NestedJtaTransactionPolicy.class.getCanonicalName());
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ Transaction suspendedTransaction = null;
+ boolean rollback = false;
+ try {
+
+ suspendedTransaction = suspendTransaction();
+ runWithTransaction(runnable, true);
+
+ } catch (Throwable e) {
+ rollback = true;
+ throw e;
+ } finally {
+ try {
+ if (rollback) {
+ rollback(false);
+ }
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Could not do rollback of outer transaction", e);
+ }
+ try {
+ resumeTransaction(suspendedTransaction);
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Could not resume outer transaction", e);
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NeverJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NeverJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NeverJtaTransactionPolicy.java
new file mode 100644
index 0000000..3adda21
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NeverJtaTransactionPolicy.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.transaction;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_NEVER")
+public class NeverJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Exception {
+
+ if (hasActiveTransaction()) {
+ throw new IllegalStateException(
+ "Policy 'PROPAGATION_NEVER' is configured but an active transaction was found!");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NotSupportedJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NotSupportedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NotSupportedJtaTransactionPolicy.java
new file mode 100644
index 0000000..30eea29
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/NotSupportedJtaTransactionPolicy.java
@@ -0,0 +1,24 @@
+package org.apache.camel.cdi.transaction;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_NOT_SUPPORTED")
+public class NotSupportedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ Transaction suspendedTransaction = null;
+ try {
+
+ suspendedTransaction = suspendTransaction();
+ runnable.run();
+
+ } finally {
+ resumeTransaction(suspendedTransaction);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/RequiredJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/RequiredJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/RequiredJtaTransactionPolicy.java
new file mode 100644
index 0000000..9ca448b
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/RequiredJtaTransactionPolicy.java
@@ -0,0 +1,15 @@
+package org.apache.camel.cdi.transaction;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_REQUIRED")
+public class RequiredJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ runWithTransaction(runnable, !hasActiveTransaction());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/RequiresNewJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/RequiresNewJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/RequiresNewJtaTransactionPolicy.java
new file mode 100644
index 0000000..607dbed
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/RequiresNewJtaTransactionPolicy.java
@@ -0,0 +1,24 @@
+package org.apache.camel.cdi.transaction;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_REQUIRES_NEW")
+public class RequiresNewJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ Transaction suspendedTransaction = null;
+ try {
+
+ suspendedTransaction = suspendTransaction();
+ runWithTransaction(runnable, true);
+
+ } finally {
+ resumeTransaction(suspendedTransaction);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/SupportsJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/SupportsJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/SupportsJtaTransactionPolicy.java
new file mode 100644
index 0000000..1242610
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/SupportsJtaTransactionPolicy.java
@@ -0,0 +1,15 @@
+package org.apache.camel.cdi.transaction;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_SUPPORTS")
+public class SupportsJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ runnable.run();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactedDefinition.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactedDefinition.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactedDefinition.java
new file mode 100644
index 0000000..d84d585
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactedDefinition.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.transaction;
+
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * Used to expose the method 'resolvePolicy' used by
+ * {@link JtaTransactionErrorHandlerBuilder} to resolve configured policy
+ * references.
+ */
+public class TransactedDefinition extends org.apache.camel.model.TransactedDefinition {
+
+ @Override
+ public Policy resolvePolicy(RouteContext routeContext) {
+ return super.resolvePolicy(routeContext);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6aecd51c/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
new file mode 100644
index 0000000..5f43d57
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
@@ -0,0 +1,370 @@
+package org.apache.camel.cdi.transaction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.transaction.TransactionRolledbackException;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.ErrorHandlerSupport;
+import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.spi.ShutdownPrepared;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * Does transactional execution according given policy. This class is based on
+ * {@link org.apache.camel.spring.spi.TransactionErrorHandler} excluding
+ * redelivery functionality. In the Spring implementation redelivering is done
+ * within the transaction which is not appropriate in JTA since every error
+ * breaks the current transaction.
+ */
+public class TransactionErrorHandler extends ErrorHandlerSupport
+ implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
+
+ protected final Processor output;
+ protected volatile boolean preparingShutdown;
+
+ private ExceptionPolicyStrategy exceptionPolicy;
+
+ private JtaTransactionPolicy transactionPolicy;
+
+ private final String transactionKey;
+
+ private final LoggingLevel rollbackLoggingLevel;
+
+ /**
+ * Creates the transaction error handler.
+ *
+ * @param camelContext
+ * the camel context
+ * @param output
+ * outer processor that should use this default error handler
+ * @param exceptionPolicyStrategy
+ * strategy for onException handling
+ * @param transactionPolicy
+ * the transaction policy
+ * @param executorService
+ * the {@link java.util.concurrent.ScheduledExecutorService} to
+ * be used for redelivery thread pool. Can be <tt>null</tt>.
+ * @param rollbackLoggingLevel
+ * logging level to use for logging transaction rollback occurred
+ */
+ public TransactionErrorHandler(CamelContext camelContext, Processor output,
+ ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy,
+ ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel) {
+
+ this.output = output;
+ this.transactionPolicy = transactionPolicy;
+ this.rollbackLoggingLevel = rollbackLoggingLevel;
+ this.transactionKey = ObjectHelper.getIdentityHashCode(transactionPolicy);
+
+ setExceptionPolicy(exceptionPolicyStrategy);
+
+ }
+
+ public void process(Exchange exchange) throws Exception {
+
+ // we have to run this synchronously as a JTA Transaction does *not*
+ // support using multiple threads to span a transaction
+ if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
+ // already transacted by this transaction template
+ // so lets just let the error handler process it
+ processByErrorHandler(exchange);
+ } else {
+ // not yet wrapped in transaction so lets do that
+ // and then have it invoke the error handler from within that
+ // transaction
+ processInTransaction(exchange);
+ }
+
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+
+ // invoke this synchronous method as JTA Transaction does *not*
+ // support using multiple threads to span a transaction
+ try {
+ process(exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+
+ // notify callback we are done synchronously
+ callback.done(true);
+ return true;
+
+ }
+
+ protected void processInTransaction(final Exchange exchange) throws Exception {
+ // is the exchange redelivered, for example JMS brokers support such
+ // details
+ Boolean externalRedelivered = exchange.isExternalRedelivered();
+ final String redelivered = externalRedelivered != null ? externalRedelivered.toString() : "unknown";
+ final String ids = ExchangeHelper.logIds(exchange);
+
+ try {
+ // mark the beginning of this transaction boundary
+ exchange.getUnitOfWork().beginTransactedBy(transactionKey);
+
+ // do in transaction
+ logTransactionBegin(redelivered, ids);
+ doInTransactionTemplate(exchange);
+ logTransactionCommit(redelivered, ids);
+
+ } catch (TransactionRolledbackException e) {
+ // do not set as exception, as its just a dummy exception to force
+ // spring TX to rollback
+ logTransactionRollback(redelivered, ids, null, true);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ logTransactionRollback(redelivered, ids, e, false);
+ } finally {
+ // mark the end of this transaction boundary
+ exchange.getUnitOfWork().endTransactedBy(transactionKey);
+ }
+
+ // if it was a local rollback only then remove its marker so outer
+ // transaction wont see the marker
+ Boolean onlyLast = (Boolean) exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST);
+ if (onlyLast != null && onlyLast) {
+ // we only want this logged at debug level
+ if (log.isDebugEnabled()) {
+ // log exception if there was a cause exception so we have the
+ // stack trace
+ Exception cause = exchange.getException();
+ if (cause != null) {
+ log.debug("Transaction rollback (" + transactionKey + ") redelivered(" + redelivered + ") for "
+ + ids + " due exchange was marked for rollbackOnlyLast and caught: ", cause);
+ } else {
+ log.debug(
+ "Transaction rollback ({}) redelivered({}) for {} "
+ + "due exchange was marked for rollbackOnlyLast",
+ new Object[] { transactionKey, redelivered, ids });
+ }
+ }
+ // remove caused exception due we was marked as rollback only last
+ // so by removing the exception, any outer transaction will not be
+ // affected
+ exchange.setException(null);
+ }
+ }
+
+ public void setTransactionPolicy(JtaTransactionPolicy transactionPolicy) {
+ this.transactionPolicy = transactionPolicy;
+ }
+
+ protected void doInTransactionTemplate(final Exchange exchange) throws Throwable {
+
+ // spring transaction template is working best with rollback if you
+ // throw it a runtime exception
+ // otherwise it may not rollback messages send to JMS queues etc.
+ transactionPolicy.run(new JtaTransactionPolicy.Runnable() {
+
+ @Override
+ public void run() throws Throwable {
+
+ // wrapper exception to throw if the exchange failed
+ // IMPORTANT: Must be a runtime exception to let Spring regard
+ // it as to do "rollback"
+ Throwable rce;
+
+ // and now let process the exchange by the error handler
+ processByErrorHandler(exchange);
+
+ // after handling and still an exception or marked as rollback
+ // only then rollback
+ if (exchange.getException() != null || exchange.isRollbackOnly()) {
+
+ // wrap exception in transacted exception
+ if (exchange.getException() != null) {
+ rce = exchange.getException();
+ } else {
+ // create dummy exception to force spring transaction
+ // manager to rollback
+ rce = new TransactionRolledbackException();
+ }
+
+ // throw runtime exception to force rollback (which works
+ // best to rollback with Spring transaction manager)
+ if (log.isTraceEnabled()) {
+ log.trace("Throwing runtime exception to force transaction to rollback on {}",
+ transactionPolicy);
+ }
+ throw rce;
+ }
+ }
+
+ });
+
+ }
+
+ /**
+ * Processes the {@link Exchange} using the error handler.
+ * <p/>
+ * This implementation will invoke ensure this occurs synchronously, that
+ * means if the async routing engine did kick in, then this implementation
+ * will wait for the task to complete before it continues.
+ *
+ * @param exchange
+ * the exchange
+ */
+ protected void processByErrorHandler(final Exchange exchange) {
+
+ try {
+ output.process(exchange);
+ } catch (Throwable e) {
+ throw new RuntimeCamelException(e);
+ }
+
+ }
+
+ /**
+ * Logs the transaction begin
+ */
+ private void logTransactionBegin(String redelivered, String ids) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transaction begin ({}) redelivered({}) for {})",
+ new Object[] { transactionKey, redelivered, ids });
+ }
+ }
+
+ /**
+ * Logs the transaction commit
+ */
+ private void logTransactionCommit(String redelivered, String ids) {
+ if ("true".equals(redelivered)) {
+ // okay its a redelivered message so log at INFO level if
+ // rollbackLoggingLevel is INFO or higher
+ // this allows people to know that the redelivered message was
+ // committed this time
+ if (rollbackLoggingLevel == LoggingLevel.INFO || rollbackLoggingLevel == LoggingLevel.WARN
+ || rollbackLoggingLevel == LoggingLevel.ERROR) {
+ log.info("Transaction commit ({}) redelivered({}) for {})",
+ new Object[] { transactionKey, redelivered, ids });
+ // return after we have logged
+ return;
+ }
+ }
+
+ // log non redelivered by default at DEBUG level
+ log.debug("Transaction commit ({}) redelivered({}) for {})", new Object[] { transactionKey, redelivered, ids });
+ }
+
+ /**
+ * Logs the transaction rollback.
+ */
+ private void logTransactionRollback(String redelivered, String ids, Throwable e, boolean rollbackOnly) {
+ if (rollbackLoggingLevel == LoggingLevel.OFF) {
+ return;
+ } else if (rollbackLoggingLevel == LoggingLevel.ERROR && log.isErrorEnabled()) {
+ if (rollbackOnly) {
+ log.error("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.error("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ } else if (rollbackLoggingLevel == LoggingLevel.WARN && log.isWarnEnabled()) {
+ if (rollbackOnly) {
+ log.warn("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.warn("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ } else if (rollbackLoggingLevel == LoggingLevel.INFO && log.isInfoEnabled()) {
+ if (rollbackOnly) {
+ log.info("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.info("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ } else if (rollbackLoggingLevel == LoggingLevel.DEBUG && log.isDebugEnabled()) {
+ if (rollbackOnly) {
+ log.debug("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.debug("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ } else if (rollbackLoggingLevel == LoggingLevel.TRACE && log.isTraceEnabled()) {
+ if (rollbackOnly) {
+ log.trace("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.trace("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ }
+ }
+
+ public void setExceptionPolicy(ExceptionPolicyStrategy exceptionPolicy) {
+ this.exceptionPolicy = exceptionPolicy;
+ }
+
+ public ExceptionPolicyStrategy getExceptionPolicy() {
+ return exceptionPolicy;
+ }
+
+ @Override
+ public Processor getOutput() {
+ return output;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(output);
+ preparingShutdown = false;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop, do not stop any services which we only do when shutting down
+ // as the error handler can be context scoped, and should not stop in
+ // case
+ // a route stops
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(output);
+ }
+
+ @Override
+ public boolean supportTransacted() {
+ return true;
+ }
+
+ public boolean hasNext() {
+ return output != null;
+ }
+
+ @Override
+ public List<Processor> next() {
+ if (!hasNext()) {
+ return null;
+ }
+ List<Processor> answer = new ArrayList<Processor>(1);
+ answer.add(output);
+ return answer;
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // prepare for shutdown, eg do not allow redelivery if configured
+ log.trace("Prepare shutdown on error handler {}", this);
+ preparingShutdown = true;
+ }
+
+}