You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2017/04/13 14:52:35 UTC

[3/7] camel git commit: CAMEL-10685: TransactionErrorHandler and TransactionPolicy for Camel CDI / JavaEE

CAMEL-10685: TransactionErrorHandler and TransactionPolicy for Camel CDI / JavaEE


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8092e89f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8092e89f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8092e89f

Branch: refs/heads/master
Commit: 8092e89f63515889bbd26457d1380b5c6cf68eb1
Parents: cc4ccd4
Author: Stephan Pelikan <st...@wdw-elab.de>
Authored: Thu Apr 13 11:11:52 2017 +0200
Committer: Antonin Stefanutti <an...@stefanutti.fr>
Committed: Thu Apr 13 16:52:19 2017 +0200

----------------------------------------------------------------------
 components/camel-cdi/pom.xml                    |   8 +
 .../apache/camel/cdi/CdiCamelConfiguration.java |  13 +
 .../camel/cdi/CdiCamelConfigurationEvent.java   |  17 +-
 .../org/apache/camel/cdi/CdiCamelExtension.java |  22 +-
 .../org/apache/camel/cdi/JtaRouteBuilder.java   |  24 ++
 .../cdi/jta/JtaTransactionErrorHandler.java     |  48 +++
 .../jta/JtaTransactionErrorHandlerBuilder.java  | 164 ++++++++
 .../camel/cdi/jta/JtaTransactionPolicy.java     | 136 +++++++
 .../cdi/jta/MandatoryJtaTransactionPolicy.java  |  18 +
 .../cdi/jta/NestedJtaTransactionPolicy.java     |  44 +++
 .../cdi/jta/NeverJtaTransactionPolicy.java      |  18 +
 .../jta/NotSupportedJtaTransactionPolicy.java   |  24 ++
 .../cdi/jta/RequiredJtaTransactionPolicy.java   |  15 +
 .../jta/RequiresNewJtaTransactionPolicy.java    |  24 ++
 .../cdi/jta/SupportsJtaTransactionPolicy.java   |  15 +
 .../camel/cdi/jta/TransactedDefinition.java     |  18 +
 .../camel/cdi/jta/TransactionErrorHandler.java  | 370 +++++++++++++++++++
 .../jta/TransactionalJtaTransactionPolicy.java  | 121 ++++++
 parent/pom.xml                                  |   1 +
 19 files changed, 1088 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-cdi/pom.xml b/components/camel-cdi/pom.xml
index c54521e..9fb19d7 100644
--- a/components/camel-cdi/pom.xml
+++ b/components/camel-cdi/pom.xml
@@ -115,6 +115,14 @@
       <optional>true</optional>
     </dependency>
 
+	<dependency>
+	    <groupId>javax.transaction</groupId>
+	    <artifactId>javax.transaction-api</artifactId>
+	    <version>${jta-api-1.2-version}</version>
+	    <scope>provided</scope>
+	    <optional>true</optional>
+	</dependency>
+
     <!-- test dependencies -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java
index 4ea53c6..d14ccf0 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java
@@ -38,4 +38,17 @@ public interface CdiCamelConfiguration {
      * @return Current state of autoConfigureRoutes parameter.
      */
     boolean autoConfigureRoutes();
+    
+    /**
+     * Overrides the Camel CDI behavior to automatically start all Camel contexts.
+     * @return this Camel CDI configuration
+     * @throws IllegalStateException if called outside of the observer method invocation
+     */
+    CdiCamelConfiguration autoStartContexts(boolean autoStartContexts);
+
+    /**
+     * @return Current state of autoStartContexts parameter.
+     */
+    boolean autoStartContexts();
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java
index bf96ea0..2f1c7fc 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java
@@ -19,6 +19,7 @@ package org.apache.camel.cdi;
 /* package-private */ final class CdiCamelConfigurationEvent implements CdiCamelConfiguration {
 
     private boolean autoConfigureRoutes = true;
+    private boolean autoStartContexts = true;
     private volatile boolean unmodifiable;
 
     @Override
@@ -33,14 +34,26 @@ package org.apache.camel.cdi;
         return autoConfigureRoutes;
     }
 
+    @Override
+    public CdiCamelConfiguration autoStartContexts(boolean autoStartContexts) {
+        throwsIfUnmodifiable();
+        this.autoStartContexts = autoStartContexts;
+        return this;
+    }
+
+    @Override
+    public boolean autoStartContexts() {
+        return autoStartContexts;
+    }
+
     void unmodifiable() {
         unmodifiable = true;
     }
 
     private void throwsIfUnmodifiable() {
         if (unmodifiable) {
-            throw new IllegalStateException("Camel CDI configuration event must not be used outside "
-            + "its observer method!");
+            throw new IllegalStateException(
+                    "Camel CDI configuration event must not be used outside " + "its observer method!");
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
index 54a5b9a..8862476 100755
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
@@ -415,16 +415,18 @@ public class CdiCamelExtension implements Extension {
             .forEach(bean -> getReference(manager, bean.getBeanClass(), bean).toString());
 
         // Start Camel contexts
-        for (CamelContext context : contexts) {
-            if (ServiceStatus.Started.equals(context.getStatus())) {
-                continue;
-            }
-            logger.info("Camel CDI is starting Camel context [{}]", context.getName());
-            try {
-                context.start();
-            } catch (Exception exception) {
-                adv.addDeploymentProblem(exception);
-            }
+        if (configuration.autoStartContexts()) {
+	        for (CamelContext context : contexts) {
+	            if (ServiceStatus.Started.equals(context.getStatus())) {
+	                continue;
+	            }
+	            logger.info("Camel CDI is starting Camel context [{}]", context.getName());
+	            try {
+	                context.start();
+	            } catch (Exception exception) {
+	                adv.addDeploymentProblem(exception);
+	            }
+	        }
         }
 
         // Clean-up

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
new file mode 100644
index 0000000..838a8a1
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
@@ -0,0 +1,24 @@
+package org.apache.camel.cdi;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cdi.jta.JtaTransactionErrorHandlerBuilder;
+
+/**
+ * An extension of the {@link RouteBuilder} to provide some additional helper
+ * methods
+ *
+ * @version
+ */
+public abstract class JtaRouteBuilder extends RouteBuilder {
+
+    /**
+     * Creates a transaction error handler that will lookup in application
+     * context for an exiting transaction manager.
+     *
+     * @return the created error handler
+     */
+    public JtaTransactionErrorHandlerBuilder transactionErrorHandler() {
+        return new JtaTransactionErrorHandlerBuilder();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
new file mode 100644
index 0000000..8a7f0d2
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
@@ -0,0 +1,48 @@
+package org.apache.camel.cdi.jta;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.util.CamelLogger;
+
+/**
+ * This error handler does redelivering. If the transaction fails it can be
+ * retried if configured to do so. In the Spring implementation redelivering is
+ * done within the transaction which is not appropriate in JTA since every error
+ * breaks the current transaction.
+ */
+public class JtaTransactionErrorHandler extends org.apache.camel.processor.RedeliveryErrorHandler {
+
+    public JtaTransactionErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
+            Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
+            ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy,
+            Predicate retryWhile, ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel,
+            Processor onExceptionOccurredProcessor) {
+
+        super(camelContext,
+                new TransactionErrorHandler(camelContext,
+                        output,
+                        exceptionPolicyStrategy,
+                        transactionPolicy,
+                        executorService,
+                        rollbackLoggingLevel),
+                logger,
+                redeliveryProcessor,
+                redeliveryPolicy,
+                null,
+                null,
+                false,
+                false,
+                retryWhile,
+                executorService,
+                null,
+                onExceptionOccurredProcessor);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
new file mode 100644
index 0000000..6977e9d
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
@@ -0,0 +1,164 @@
+package org.apache.camel.cdi.jta;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.DefaultErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.TransactedPolicy;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Builds transactional error handlers. This class is based on
+ * {@link org.apache.camel.spring.spi.TransactionErrorHandlerBuilder}.
+ */
+public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class);
+
+    private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED";
+
+    public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY = JtaTransactionErrorHandlerBuilder.class.getName()
+            + "#rollbackLoggingLevel";
+
+    private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN;
+
+    private JtaTransactionPolicy transactionPolicy;
+
+    private String policyRef;
+
+    @Override
+    public boolean supportTransacted() {
+        return true;
+    }
+
+    @Override
+    public ErrorHandlerBuilder cloneBuilder() {
+
+        final JtaTransactionErrorHandlerBuilder answer = new JtaTransactionErrorHandlerBuilder();
+        cloneBuilder(answer);
+        return answer;
+
+    }
+
+    @Override
+    protected void cloneBuilder(DefaultErrorHandlerBuilder other) {
+
+        super.cloneBuilder(other);
+        if (other instanceof JtaTransactionErrorHandlerBuilder) {
+            final JtaTransactionErrorHandlerBuilder otherTx = (JtaTransactionErrorHandlerBuilder) other;
+            transactionPolicy = otherTx.transactionPolicy;
+            rollbackLoggingLevel = otherTx.rollbackLoggingLevel;
+        }
+
+    }
+
+    public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception {
+
+        // resolve policy reference, if given
+        if (transactionPolicy == null) {
+
+            if (policyRef != null) {
+
+                final TransactedDefinition transactedDefinition = new TransactedDefinition();
+                transactedDefinition.setRef(policyRef);
+                final Policy policy = transactedDefinition.resolvePolicy(routeContext);
+                if (policy != null) {
+                    if (!(policy instanceof JtaTransactionPolicy)) {
+                        throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '"
+                                + policyRef.getClass().getName() + "' but an instance of '"
+                                + JtaTransactionPolicy.class.getName() + "' is required!");
+                    }
+                    transactionPolicy = (JtaTransactionPolicy) policy;
+                }
+
+            }
+
+        }
+
+        // try to lookup default policy
+        if (transactionPolicy == null) {
+
+            LOG.debug(
+                    "No tranaction policiy configured on TransactionErrorHandlerBuilder. Will try find it in the registry.");
+
+            Map<String, TransactedPolicy> mapPolicy = routeContext.lookupByType(TransactedPolicy.class);
+            if (mapPolicy != null && mapPolicy.size() == 1) {
+                TransactedPolicy policy = mapPolicy.values().iterator().next();
+                if (policy != null && policy instanceof JtaTransactionPolicy) {
+                    transactionPolicy = ((JtaTransactionPolicy) policy);
+                }
+            }
+
+            if (transactionPolicy == null) {
+                TransactedPolicy policy = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class);
+                if (policy != null && policy instanceof JtaTransactionPolicy) {
+                    transactionPolicy = ((JtaTransactionPolicy) policy);
+                }
+            }
+
+            if (transactionPolicy != null) {
+                LOG.debug("Found TransactionPolicy in registry to use: " + transactionPolicy);
+            }
+
+        }
+
+        ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this);
+
+        final CamelContext camelContext = routeContext.getCamelContext();
+        final Map<String, String> properties = camelContext.getProperties();
+        if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) {
+            rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY));
+        }
+
+        JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext,
+                processor,
+                getLogger(),
+                getOnRedelivery(),
+                getRedeliveryPolicy(),
+                getExceptionPolicyStrategy(),
+                transactionPolicy,
+                getRetryWhilePolicy(camelContext),
+                getExecutorService(camelContext),
+                rollbackLoggingLevel,
+                getOnExceptionOccurred());
+
+        // configure error handler before we can use it
+        configure(routeContext, answer);
+        return answer;
+
+    }
+
+    public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) {
+        policyRef = ref;
+        return this;
+    }
+
+    public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) {
+        this.transactionPolicy = transactionPolicy;
+        return this;
+    }
+
+    public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) {
+        this.rollbackLoggingLevel = rollbackLoggingLevel;
+        return this;
+    }
+
+    protected CamelLogger createLogger() {
+        return new CamelLogger(LoggerFactory.getLogger(TransactionErrorHandler.class), LoggingLevel.ERROR);
+    }
+
+    @Override
+    public String toString() {
+        return "JtaTransactionErrorHandlerBuilder";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
new file mode 100644
index 0000000..c4c70c2
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
@@ -0,0 +1,136 @@
+package org.apache.camel.cdi.jta;
+
+import javax.annotation.Resource;
+import javax.transaction.TransactionManager;
+
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilderRef;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.TransactedPolicy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sets a proper error handler. This class is based on
+ * {@link org.apache.camel.spring.spi.SpringTransactionPolicy}.
+ * <p>
+ * This class requires the resource {@link TransactionManager} to be available
+ * through JNDI url &quot;java:/TransactionManager&quot;
+ */
+public abstract class JtaTransactionPolicy implements TransactedPolicy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionPolicy.class);
+
+    public static interface Runnable {
+        void run() throws Throwable;
+    }
+
+    @Resource(lookup = "java:/TransactionManager")
+    protected TransactionManager transactionManager;
+
+    @Override
+    public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) {
+        // do not inherit since we create our own
+        // (otherwise the default error handler would be used two times
+        // because we inherit it on our own but only in case of a
+        // non-transactional
+        // error handler)
+        definition.setInheritErrorHandler(false);
+    }
+
+    public abstract void run(final Runnable runnable) throws Throwable;
+
+    @Override
+    public Processor wrap(RouteContext routeContext, Processor processor) {
+
+        JtaTransactionErrorHandler answer;
+
+        // the goal is to configure the error handler builder on the route as a
+        // transacted error handler,
+        // either its already a transacted or if not we replace it with a
+        // transacted one that we configure here
+        // and wrap the processor in the transacted error handler as we can have
+        // transacted routes that change
+        // propagation behavior, eg: from A required -> B -> requiresNew C
+        // (advanced use-case)
+        // if we should not support this we do not need to wrap the processor as
+        // we only need one transacted error handler
+
+        // find the existing error handler builder
+        ErrorHandlerBuilder builder = (ErrorHandlerBuilder) routeContext.getRoute().getErrorHandlerBuilder();
+
+        // check if its a ref if so then do a lookup
+        if (builder instanceof ErrorHandlerBuilderRef) {
+            // its a reference to a error handler so lookup the reference
+            ErrorHandlerBuilderRef builderRef = (ErrorHandlerBuilderRef) builder;
+            String ref = builderRef.getRef();
+            // only lookup if there was explicit an error handler builder
+            // configured
+            // otherwise its just the "default" that has not explicit been
+            // configured
+            // and if so then we can safely replace that with our transacted
+            // error handler
+            if (ErrorHandlerBuilderRef.isErrorHandlerBuilderConfigured(ref)) {
+                LOG.debug("Looking up ErrorHandlerBuilder with ref: {}", ref);
+                builder = (ErrorHandlerBuilder) ErrorHandlerBuilderRef.lookupErrorHandlerBuilder(routeContext, ref);
+            }
+        }
+
+        JtaTransactionErrorHandlerBuilder txBuilder;
+        if ((builder != null) && builder.supportTransacted()) {
+            if (!(builder instanceof JtaTransactionErrorHandlerBuilder)) {
+                throw new RuntimeCamelException("The given transactional error handler builder '" + builder
+                        + "' is not of type '" + JtaTransactionErrorHandlerBuilder.class.getName()
+                        + "' which is required in this environment!");
+            }
+            LOG.debug("The ErrorHandlerBuilder configured is a JtaTransactionErrorHandlerBuilder: {}", builder);
+            txBuilder = (JtaTransactionErrorHandlerBuilder) builder.cloneBuilder();
+        } else {
+            LOG.debug(
+                    "No or no transactional ErrorHandlerBuilder configured, will use default JtaTransactionErrorHandlerBuilder settings");
+            txBuilder = new JtaTransactionErrorHandlerBuilder();
+        }
+
+        txBuilder.setTransactionPolicy(this);
+
+        // use error handlers from the configured builder
+        if (builder != null) {
+            txBuilder.setErrorHandlers(routeContext, builder.getErrorHandlers(routeContext));
+        }
+
+        answer = createTransactionErrorHandler(routeContext, processor, txBuilder);
+        answer.setExceptionPolicy(txBuilder.getExceptionPolicyStrategy());
+        // configure our answer based on the existing error handler
+        txBuilder.configure(routeContext, answer);
+
+        // set the route to use our transacted error handler builder
+        routeContext.getRoute().setErrorHandlerBuilder(txBuilder);
+
+        // return with wrapped transacted error handler
+        return answer;
+
+    }
+
+    protected JtaTransactionErrorHandler createTransactionErrorHandler(RouteContext routeContext, Processor processor,
+            ErrorHandlerBuilder builder) {
+
+        JtaTransactionErrorHandler answer;
+        try {
+            answer = (JtaTransactionErrorHandler) builder.createErrorHandler(routeContext, processor);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+        return answer;
+
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getName();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
new file mode 100644
index 0000000..260ad69
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_MANDATORY")
+public class MandatoryJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+    @Override
+    public void run(final Runnable runnable) throws Exception {
+
+        if (!hasActiveTransaction()) {
+            throw new IllegalStateException(
+                    "Policy 'PROPAGATION_MANDATORY' is configured but no active transaction was found!");
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
new file mode 100644
index 0000000..6ce116a
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
@@ -0,0 +1,44 @@
+package org.apache.camel.cdi.jta;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_NESTED")
+public class NestedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+    private static final Logger logger = Logger.getLogger(NestedJtaTransactionPolicy.class.getCanonicalName());
+
+    @Override
+    public void run(final Runnable runnable) throws Throwable {
+
+        Transaction suspendedTransaction = null;
+        boolean rollback = false;
+        try {
+
+            suspendedTransaction = suspendTransaction();
+            runWithTransaction(runnable, true);
+
+        } catch (Throwable e) {
+            rollback = true;
+            throw e;
+        } finally {
+            try {
+                if (rollback) {
+                    rollback(false);
+                }
+            } catch (Exception e) {
+                logger.log(Level.WARNING, "Could not do rollback of outer transaction", e);
+            }
+            try {
+                resumeTransaction(suspendedTransaction);
+            } catch (Exception e) {
+                logger.log(Level.WARNING, "Could not resume outer transaction", e);
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
new file mode 100644
index 0000000..377c856
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_NEVER")
+public class NeverJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+    @Override
+    public void run(final Runnable runnable) throws Exception {
+
+        if (hasActiveTransaction()) {
+            throw new IllegalStateException(
+                    "Policy 'PROPAGATION_NEVER' is configured but an active transaction was found!");
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
new file mode 100644
index 0000000..c3c6bfc
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
@@ -0,0 +1,24 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_NOT_SUPPORTED")
+public class NotSupportedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+    @Override
+    public void run(final Runnable runnable) throws Throwable {
+
+        Transaction suspendedTransaction = null;
+        try {
+
+            suspendedTransaction = suspendTransaction();
+            runnable.run();
+
+        } finally {
+            resumeTransaction(suspendedTransaction);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
new file mode 100644
index 0000000..b40dd80
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
@@ -0,0 +1,15 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_REQUIRED")
+public class RequiredJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+    @Override
+    public void run(final Runnable runnable) throws Throwable {
+
+        runWithTransaction(runnable, !hasActiveTransaction());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
new file mode 100644
index 0000000..4b1fa47
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
@@ -0,0 +1,24 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_REQUIRES_NEW")
+public class RequiresNewJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+    @Override
+    public void run(final Runnable runnable) throws Throwable {
+
+        Transaction suspendedTransaction = null;
+        try {
+
+            suspendedTransaction = suspendTransaction();
+            runWithTransaction(runnable, true);
+
+        } finally {
+            resumeTransaction(suspendedTransaction);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
new file mode 100644
index 0000000..28ba016
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
@@ -0,0 +1,15 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_SUPPORTS")
+public class SupportsJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+    @Override
+    public void run(final Runnable runnable) throws Throwable {
+
+        runnable.run();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
new file mode 100644
index 0000000..9d01cbe
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.jta;
+
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * Used to expose the method &apos;resolvePolicy&apos; used by
+ * {@link JtaTransactionErrorHandlerBuilder} to resolve configured policy
+ * references.
+ */
+public class TransactedDefinition extends org.apache.camel.model.TransactedDefinition {
+
+    @Override
+    public Policy resolvePolicy(RouteContext routeContext) {
+        return super.resolvePolicy(routeContext);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
new file mode 100644
index 0000000..651074e
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
@@ -0,0 +1,370 @@
+package org.apache.camel.cdi.jta;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.transaction.TransactionRolledbackException;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.ErrorHandlerSupport;
+import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.spi.ShutdownPrepared;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * Does transactional execution according given policy. This class is based on
+ * {@link org.apache.camel.spring.spi.TransactionErrorHandler} excluding
+ * redelivery functionality. In the Spring implementation redelivering is done
+ * within the transaction which is not appropriate in JTA since every error
+ * breaks the current transaction.
+ */
+public class TransactionErrorHandler extends ErrorHandlerSupport
+        implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
+
+    protected final Processor output;
+    protected volatile boolean preparingShutdown;
+
+    private ExceptionPolicyStrategy exceptionPolicy;
+
+    private JtaTransactionPolicy transactionPolicy;
+
+    private final String transactionKey;
+
+    private final LoggingLevel rollbackLoggingLevel;
+
+    /**
+     * Creates the transaction error handler.
+     *
+     * @param camelContext
+     *            the camel context
+     * @param output
+     *            outer processor that should use this default error handler
+     * @param exceptionPolicyStrategy
+     *            strategy for onException handling
+     * @param transactionPolicy
+     *            the transaction policy
+     * @param executorService
+     *            the {@link java.util.concurrent.ScheduledExecutorService} to
+     *            be used for redelivery thread pool. Can be <tt>null</tt>.
+     * @param rollbackLoggingLevel
+     *            logging level to use for logging transaction rollback occurred
+     */
+    public TransactionErrorHandler(CamelContext camelContext, Processor output,
+            ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy,
+            ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel) {
+
+        this.output = output;
+        this.transactionPolicy = transactionPolicy;
+        this.rollbackLoggingLevel = rollbackLoggingLevel;
+        this.transactionKey = ObjectHelper.getIdentityHashCode(transactionPolicy);
+
+        setExceptionPolicy(exceptionPolicyStrategy);
+
+    }
+
+    public void process(Exchange exchange) throws Exception {
+
+        // we have to run this synchronously as a JTA Transaction does *not*
+        // support using multiple threads to span a transaction
+        if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
+            // already transacted by this transaction template
+            // so lets just let the error handler process it
+            processByErrorHandler(exchange);
+        } else {
+            // not yet wrapped in transaction so lets do that
+            // and then have it invoke the error handler from within that
+            // transaction
+            processInTransaction(exchange);
+        }
+
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+
+        // invoke this synchronous method as JTA Transaction does *not*
+        // support using multiple threads to span a transaction
+        try {
+            process(exchange);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        // notify callback we are done synchronously
+        callback.done(true);
+        return true;
+
+    }
+
+    protected void processInTransaction(final Exchange exchange) throws Exception {
+        // is the exchange redelivered, for example JMS brokers support such
+        // details
+        Boolean externalRedelivered = exchange.isExternalRedelivered();
+        final String redelivered = externalRedelivered != null ? externalRedelivered.toString() : "unknown";
+        final String ids = ExchangeHelper.logIds(exchange);
+
+        try {
+            // mark the beginning of this transaction boundary
+            exchange.getUnitOfWork().beginTransactedBy(transactionKey);
+
+            // do in transaction
+            logTransactionBegin(redelivered, ids);
+            doInTransactionTemplate(exchange);
+            logTransactionCommit(redelivered, ids);
+
+        } catch (TransactionRolledbackException e) {
+            // do not set as exception, as its just a dummy exception to force
+            // spring TX to rollback
+            logTransactionRollback(redelivered, ids, null, true);
+        } catch (Throwable e) {
+            exchange.setException(e);
+            logTransactionRollback(redelivered, ids, e, false);
+        } finally {
+            // mark the end of this transaction boundary
+            exchange.getUnitOfWork().endTransactedBy(transactionKey);
+        }
+
+        // if it was a local rollback only then remove its marker so outer
+        // transaction wont see the marker
+        Boolean onlyLast = (Boolean) exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST);
+        if (onlyLast != null && onlyLast) {
+            // we only want this logged at debug level
+            if (log.isDebugEnabled()) {
+                // log exception if there was a cause exception so we have the
+                // stack trace
+                Exception cause = exchange.getException();
+                if (cause != null) {
+                    log.debug("Transaction rollback (" + transactionKey + ") redelivered(" + redelivered + ") for "
+                            + ids + " due exchange was marked for rollbackOnlyLast and caught: ", cause);
+                } else {
+                    log.debug(
+                            "Transaction rollback ({}) redelivered({}) for {} "
+                                    + "due exchange was marked for rollbackOnlyLast",
+                            new Object[] { transactionKey, redelivered, ids });
+                }
+            }
+            // remove caused exception due we was marked as rollback only last
+            // so by removing the exception, any outer transaction will not be
+            // affected
+            exchange.setException(null);
+        }
+    }
+
+    public void setTransactionPolicy(JtaTransactionPolicy transactionPolicy) {
+        this.transactionPolicy = transactionPolicy;
+    }
+
+    protected void doInTransactionTemplate(final Exchange exchange) throws Throwable {
+
+        // spring transaction template is working best with rollback if you
+        // throw it a runtime exception
+        // otherwise it may not rollback messages send to JMS queues etc.
+        transactionPolicy.run(new JtaTransactionPolicy.Runnable() {
+
+            @Override
+            public void run() throws Throwable {
+
+                // wrapper exception to throw if the exchange failed
+                // IMPORTANT: Must be a runtime exception to let Spring regard
+                // it as to do "rollback"
+                Throwable rce;
+
+                // and now let process the exchange by the error handler
+                processByErrorHandler(exchange);
+
+                // after handling and still an exception or marked as rollback
+                // only then rollback
+                if (exchange.getException() != null || exchange.isRollbackOnly()) {
+
+                    // wrap exception in transacted exception
+                    if (exchange.getException() != null) {
+                        rce = exchange.getException();
+                    } else {
+                        // create dummy exception to force spring transaction
+                        // manager to rollback
+                        rce = new TransactionRolledbackException();
+                    }
+
+                    // throw runtime exception to force rollback (which works
+                    // best to rollback with Spring transaction manager)
+                    if (log.isTraceEnabled()) {
+                        log.trace("Throwing runtime exception to force transaction to rollback on {}",
+                                transactionPolicy);
+                    }
+                    throw rce;
+                }
+            }
+
+        });
+
+    }
+
+    /**
+     * Processes the {@link Exchange} using the error handler.
+     * <p/>
+     * This implementation will invoke ensure this occurs synchronously, that
+     * means if the async routing engine did kick in, then this implementation
+     * will wait for the task to complete before it continues.
+     *
+     * @param exchange
+     *            the exchange
+     */
+    protected void processByErrorHandler(final Exchange exchange) {
+
+        try {
+            output.process(exchange);
+        } catch (Throwable e) {
+            throw new RuntimeCamelException(e);
+        }
+
+    }
+
+    /**
+     * Logs the transaction begin
+     */
+    private void logTransactionBegin(String redelivered, String ids) {
+        if (log.isDebugEnabled()) {
+            log.debug("Transaction begin ({}) redelivered({}) for {})",
+                    new Object[] { transactionKey, redelivered, ids });
+        }
+    }
+
+    /**
+     * Logs the transaction commit
+     */
+    private void logTransactionCommit(String redelivered, String ids) {
+        if ("true".equals(redelivered)) {
+            // okay its a redelivered message so log at INFO level if
+            // rollbackLoggingLevel is INFO or higher
+            // this allows people to know that the redelivered message was
+            // committed this time
+            if (rollbackLoggingLevel == LoggingLevel.INFO || rollbackLoggingLevel == LoggingLevel.WARN
+                    || rollbackLoggingLevel == LoggingLevel.ERROR) {
+                log.info("Transaction commit ({}) redelivered({}) for {})",
+                        new Object[] { transactionKey, redelivered, ids });
+                // return after we have logged
+                return;
+            }
+        }
+
+        // log non redelivered by default at DEBUG level
+        log.debug("Transaction commit ({}) redelivered({}) for {})", new Object[] { transactionKey, redelivered, ids });
+    }
+
+    /**
+     * Logs the transaction rollback.
+     */
+    private void logTransactionRollback(String redelivered, String ids, Throwable e, boolean rollbackOnly) {
+        if (rollbackLoggingLevel == LoggingLevel.OFF) {
+            return;
+        } else if (rollbackLoggingLevel == LoggingLevel.ERROR && log.isErrorEnabled()) {
+            if (rollbackOnly) {
+                log.error("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+                        new Object[] { transactionKey, redelivered, ids });
+            } else {
+                log.error("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+                        new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+            }
+        } else if (rollbackLoggingLevel == LoggingLevel.WARN && log.isWarnEnabled()) {
+            if (rollbackOnly) {
+                log.warn("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+                        new Object[] { transactionKey, redelivered, ids });
+            } else {
+                log.warn("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+                        new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+            }
+        } else if (rollbackLoggingLevel == LoggingLevel.INFO && log.isInfoEnabled()) {
+            if (rollbackOnly) {
+                log.info("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+                        new Object[] { transactionKey, redelivered, ids });
+            } else {
+                log.info("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+                        new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+            }
+        } else if (rollbackLoggingLevel == LoggingLevel.DEBUG && log.isDebugEnabled()) {
+            if (rollbackOnly) {
+                log.debug("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+                        new Object[] { transactionKey, redelivered, ids });
+            } else {
+                log.debug("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+                        new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+            }
+        } else if (rollbackLoggingLevel == LoggingLevel.TRACE && log.isTraceEnabled()) {
+            if (rollbackOnly) {
+                log.trace("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+                        new Object[] { transactionKey, redelivered, ids });
+            } else {
+                log.trace("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+                        new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+            }
+        }
+    }
+
+    public void setExceptionPolicy(ExceptionPolicyStrategy exceptionPolicy) {
+        this.exceptionPolicy = exceptionPolicy;
+    }
+
+    public ExceptionPolicyStrategy getExceptionPolicy() {
+        return exceptionPolicy;
+    }
+
+    @Override
+    public Processor getOutput() {
+        return output;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(output);
+        preparingShutdown = false;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop, do not stop any services which we only do when shutting down
+        // as the error handler can be context scoped, and should not stop in
+        // case
+        // a route stops
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(output);
+    }
+
+    @Override
+    public boolean supportTransacted() {
+        return true;
+    }
+
+    public boolean hasNext() {
+        return output != null;
+    }
+
+    @Override
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        List<Processor> answer = new ArrayList<Processor>(1);
+        answer.add(output);
+        return answer;
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // prepare for shutdown, eg do not allow redelivery if configured
+        log.trace("Prepare shutdown on error handler {}", this);
+        preparingShutdown = true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
new file mode 100644
index 0000000..4d17f8a
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
@@ -0,0 +1,121 @@
+package org.apache.camel.cdi.jta;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
+import org.apache.camel.CamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper methods for transaction handling
+ */
+public abstract class TransactionalJtaTransactionPolicy extends JtaTransactionPolicy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionalJtaTransactionPolicy.class);
+
+    protected void runWithTransaction(final Runnable runnable, final boolean isNew) throws Throwable {
+
+        if (isNew) {
+            begin();
+        }
+        try {
+            runnable.run();
+        } catch (RuntimeException e) {
+            rollback(isNew);
+            throw e;
+        } catch (Error e) {
+            rollback(isNew);
+            throw e;
+        } catch (Throwable e) {
+            rollback(isNew);
+            throw e;
+        }
+        if (isNew) {
+            commit();
+        }
+        return;
+
+    }
+
+    private void begin() throws Exception {
+
+        transactionManager.begin();
+
+    }
+
+    private void commit() throws Exception {
+
+        try {
+            transactionManager.commit();
+        } catch (HeuristicMixedException e) {
+            throw new CamelException("Unable to commit transaction", e);
+        } catch (HeuristicRollbackException e) {
+            throw new CamelException("Unable to commit transaction", e);
+        } catch (RollbackException e) {
+            throw new CamelException("Unable to commit transaction", e);
+        } catch (SystemException e) {
+            throw new CamelException("Unable to commit transaction", e);
+        } catch (RuntimeException e) {
+            rollback(true);
+            throw e;
+        } catch (Exception e) {
+            rollback(true);
+            throw e;
+        } catch (Error e) {
+            rollback(true);
+            throw e;
+        }
+
+    }
+
+    protected void rollback(boolean isNew) throws Exception {
+
+        try {
+
+            if (isNew) {
+                transactionManager.rollback();
+            } else {
+                transactionManager.setRollbackOnly();
+            }
+
+        } catch (Throwable e) {
+
+            LOG.warn("Could not rollback transaction!", e);
+
+        }
+
+    }
+
+    protected Transaction suspendTransaction() throws Exception {
+
+        return transactionManager.suspend();
+
+    }
+
+    protected void resumeTransaction(final Transaction suspendedTransaction) {
+
+        if (suspendedTransaction == null) {
+            return;
+        }
+
+        try {
+            transactionManager.resume(suspendedTransaction);
+        } catch (Throwable e) {
+            LOG.warn("Could not resume transaction!", e);
+        }
+
+    }
+
+    protected boolean hasActiveTransaction() throws Exception {
+
+        return transactionManager.getStatus() != Status.STATUS_MARKED_ROLLBACK
+                && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index df10dde..09719b1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -105,6 +105,7 @@
     <cdi-api-1.1-version>1.1</cdi-api-1.1-version>
     <cdi-api-1.2-version>1.2</cdi-api-1.2-version>
     <cdi-api-2.0-version>2.0-PFD2</cdi-api-2.0-version>
+    <jta-api-1.2-version>1.2</jta-api-1.2-version>
     <cglib-bundle-version>3.2.4_1</cglib-bundle-version>
     <cglib-version>3.2.4</cglib-version>
     <chronicle-bytes-version>1.7.35</chronicle-bytes-version>