You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2017/04/13 14:52:35 UTC
[3/7] camel git commit: CAMEL-10685: TransactionErrorHandler and
TransactionPolicy for Camel CDI / JavaEE
CAMEL-10685: TransactionErrorHandler and TransactionPolicy for Camel CDI / JavaEE
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8092e89f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8092e89f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8092e89f
Branch: refs/heads/master
Commit: 8092e89f63515889bbd26457d1380b5c6cf68eb1
Parents: cc4ccd4
Author: Stephan Pelikan <st...@wdw-elab.de>
Authored: Thu Apr 13 11:11:52 2017 +0200
Committer: Antonin Stefanutti <an...@stefanutti.fr>
Committed: Thu Apr 13 16:52:19 2017 +0200
----------------------------------------------------------------------
components/camel-cdi/pom.xml | 8 +
.../apache/camel/cdi/CdiCamelConfiguration.java | 13 +
.../camel/cdi/CdiCamelConfigurationEvent.java | 17 +-
.../org/apache/camel/cdi/CdiCamelExtension.java | 22 +-
.../org/apache/camel/cdi/JtaRouteBuilder.java | 24 ++
.../cdi/jta/JtaTransactionErrorHandler.java | 48 +++
.../jta/JtaTransactionErrorHandlerBuilder.java | 164 ++++++++
.../camel/cdi/jta/JtaTransactionPolicy.java | 136 +++++++
.../cdi/jta/MandatoryJtaTransactionPolicy.java | 18 +
.../cdi/jta/NestedJtaTransactionPolicy.java | 44 +++
.../cdi/jta/NeverJtaTransactionPolicy.java | 18 +
.../jta/NotSupportedJtaTransactionPolicy.java | 24 ++
.../cdi/jta/RequiredJtaTransactionPolicy.java | 15 +
.../jta/RequiresNewJtaTransactionPolicy.java | 24 ++
.../cdi/jta/SupportsJtaTransactionPolicy.java | 15 +
.../camel/cdi/jta/TransactedDefinition.java | 18 +
.../camel/cdi/jta/TransactionErrorHandler.java | 370 +++++++++++++++++++
.../jta/TransactionalJtaTransactionPolicy.java | 121 ++++++
parent/pom.xml | 1 +
19 files changed, 1088 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-cdi/pom.xml b/components/camel-cdi/pom.xml
index c54521e..9fb19d7 100644
--- a/components/camel-cdi/pom.xml
+++ b/components/camel-cdi/pom.xml
@@ -115,6 +115,14 @@
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>javax.transaction</groupId>
+ <artifactId>javax.transaction-api</artifactId>
+ <version>${jta-api-1.2-version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java
index 4ea53c6..d14ccf0 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java
@@ -38,4 +38,17 @@ public interface CdiCamelConfiguration {
* @return Current state of autoConfigureRoutes parameter.
*/
boolean autoConfigureRoutes();
+
+ /**
+ * Overrides the Camel CDI behavior to automatically start all Camel contexts.
+ * @return this Camel CDI configuration
+ * @throws IllegalStateException if called outside of the observer method invocation
+ */
+ CdiCamelConfiguration autoStartContexts(boolean autoStartContexts);
+
+ /**
+ * @return Current state of autoStartContexts parameter.
+ */
+ boolean autoStartContexts();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java
index bf96ea0..2f1c7fc 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java
@@ -19,6 +19,7 @@ package org.apache.camel.cdi;
/* package-private */ final class CdiCamelConfigurationEvent implements CdiCamelConfiguration {
private boolean autoConfigureRoutes = true;
+ private boolean autoStartContexts = true;
private volatile boolean unmodifiable;
@Override
@@ -33,14 +34,26 @@ package org.apache.camel.cdi;
return autoConfigureRoutes;
}
+ @Override
+ public CdiCamelConfiguration autoStartContexts(boolean autoStartContexts) {
+ throwsIfUnmodifiable();
+ this.autoStartContexts = autoStartContexts;
+ return this;
+ }
+
+ @Override
+ public boolean autoStartContexts() {
+ return autoStartContexts;
+ }
+
void unmodifiable() {
unmodifiable = true;
}
private void throwsIfUnmodifiable() {
if (unmodifiable) {
- throw new IllegalStateException("Camel CDI configuration event must not be used outside "
- + "its observer method!");
+ throw new IllegalStateException(
+ "Camel CDI configuration event must not be used outside " + "its observer method!");
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
index 54a5b9a..8862476 100755
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
@@ -415,16 +415,18 @@ public class CdiCamelExtension implements Extension {
.forEach(bean -> getReference(manager, bean.getBeanClass(), bean).toString());
// Start Camel contexts
- for (CamelContext context : contexts) {
- if (ServiceStatus.Started.equals(context.getStatus())) {
- continue;
- }
- logger.info("Camel CDI is starting Camel context [{}]", context.getName());
- try {
- context.start();
- } catch (Exception exception) {
- adv.addDeploymentProblem(exception);
- }
+ if (configuration.autoStartContexts()) {
+ for (CamelContext context : contexts) {
+ if (ServiceStatus.Started.equals(context.getStatus())) {
+ continue;
+ }
+ logger.info("Camel CDI is starting Camel context [{}]", context.getName());
+ try {
+ context.start();
+ } catch (Exception exception) {
+ adv.addDeploymentProblem(exception);
+ }
+ }
}
// Clean-up
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
new file mode 100644
index 0000000..838a8a1
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java
@@ -0,0 +1,24 @@
+package org.apache.camel.cdi;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cdi.jta.JtaTransactionErrorHandlerBuilder;
+
+/**
+ * An extension of the {@link RouteBuilder} to provide some additional helper
+ * methods
+ *
+ * @version
+ */
+public abstract class JtaRouteBuilder extends RouteBuilder {
+
+ /**
+ * Creates a transaction error handler that will lookup in application
+ * context for an exiting transaction manager.
+ *
+ * @return the created error handler
+ */
+ public JtaTransactionErrorHandlerBuilder transactionErrorHandler() {
+ return new JtaTransactionErrorHandlerBuilder();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
new file mode 100644
index 0000000..8a7f0d2
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java
@@ -0,0 +1,48 @@
+package org.apache.camel.cdi.jta;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.util.CamelLogger;
+
+/**
+ * This error handler does redelivering. If the transaction fails it can be
+ * retried if configured to do so. In the Spring implementation redelivering is
+ * done within the transaction which is not appropriate in JTA since every error
+ * breaks the current transaction.
+ */
+public class JtaTransactionErrorHandler extends org.apache.camel.processor.RedeliveryErrorHandler {
+
+ public JtaTransactionErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
+ Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
+ ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy,
+ Predicate retryWhile, ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel,
+ Processor onExceptionOccurredProcessor) {
+
+ super(camelContext,
+ new TransactionErrorHandler(camelContext,
+ output,
+ exceptionPolicyStrategy,
+ transactionPolicy,
+ executorService,
+ rollbackLoggingLevel),
+ logger,
+ redeliveryProcessor,
+ redeliveryPolicy,
+ null,
+ null,
+ false,
+ false,
+ retryWhile,
+ executorService,
+ null,
+ onExceptionOccurredProcessor);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
new file mode 100644
index 0000000..6977e9d
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java
@@ -0,0 +1,164 @@
+package org.apache.camel.cdi.jta;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.DefaultErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.TransactedPolicy;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Builds transactional error handlers. This class is based on
+ * {@link org.apache.camel.spring.spi.TransactionErrorHandlerBuilder}.
+ */
+public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class);
+
+ private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED";
+
+ public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY = JtaTransactionErrorHandlerBuilder.class.getName()
+ + "#rollbackLoggingLevel";
+
+ private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN;
+
+ private JtaTransactionPolicy transactionPolicy;
+
+ private String policyRef;
+
+ @Override
+ public boolean supportTransacted() {
+ return true;
+ }
+
+ @Override
+ public ErrorHandlerBuilder cloneBuilder() {
+
+ final JtaTransactionErrorHandlerBuilder answer = new JtaTransactionErrorHandlerBuilder();
+ cloneBuilder(answer);
+ return answer;
+
+ }
+
+ @Override
+ protected void cloneBuilder(DefaultErrorHandlerBuilder other) {
+
+ super.cloneBuilder(other);
+ if (other instanceof JtaTransactionErrorHandlerBuilder) {
+ final JtaTransactionErrorHandlerBuilder otherTx = (JtaTransactionErrorHandlerBuilder) other;
+ transactionPolicy = otherTx.transactionPolicy;
+ rollbackLoggingLevel = otherTx.rollbackLoggingLevel;
+ }
+
+ }
+
+ public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception {
+
+ // resolve policy reference, if given
+ if (transactionPolicy == null) {
+
+ if (policyRef != null) {
+
+ final TransactedDefinition transactedDefinition = new TransactedDefinition();
+ transactedDefinition.setRef(policyRef);
+ final Policy policy = transactedDefinition.resolvePolicy(routeContext);
+ if (policy != null) {
+ if (!(policy instanceof JtaTransactionPolicy)) {
+ throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '"
+ + policyRef.getClass().getName() + "' but an instance of '"
+ + JtaTransactionPolicy.class.getName() + "' is required!");
+ }
+ transactionPolicy = (JtaTransactionPolicy) policy;
+ }
+
+ }
+
+ }
+
+ // try to lookup default policy
+ if (transactionPolicy == null) {
+
+ LOG.debug(
+ "No tranaction policiy configured on TransactionErrorHandlerBuilder. Will try find it in the registry.");
+
+ Map<String, TransactedPolicy> mapPolicy = routeContext.lookupByType(TransactedPolicy.class);
+ if (mapPolicy != null && mapPolicy.size() == 1) {
+ TransactedPolicy policy = mapPolicy.values().iterator().next();
+ if (policy != null && policy instanceof JtaTransactionPolicy) {
+ transactionPolicy = ((JtaTransactionPolicy) policy);
+ }
+ }
+
+ if (transactionPolicy == null) {
+ TransactedPolicy policy = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class);
+ if (policy != null && policy instanceof JtaTransactionPolicy) {
+ transactionPolicy = ((JtaTransactionPolicy) policy);
+ }
+ }
+
+ if (transactionPolicy != null) {
+ LOG.debug("Found TransactionPolicy in registry to use: " + transactionPolicy);
+ }
+
+ }
+
+ ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this);
+
+ final CamelContext camelContext = routeContext.getCamelContext();
+ final Map<String, String> properties = camelContext.getProperties();
+ if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) {
+ rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY));
+ }
+
+ JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext,
+ processor,
+ getLogger(),
+ getOnRedelivery(),
+ getRedeliveryPolicy(),
+ getExceptionPolicyStrategy(),
+ transactionPolicy,
+ getRetryWhilePolicy(camelContext),
+ getExecutorService(camelContext),
+ rollbackLoggingLevel,
+ getOnExceptionOccurred());
+
+ // configure error handler before we can use it
+ configure(routeContext, answer);
+ return answer;
+
+ }
+
+ public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) {
+ policyRef = ref;
+ return this;
+ }
+
+ public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) {
+ this.transactionPolicy = transactionPolicy;
+ return this;
+ }
+
+ public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) {
+ this.rollbackLoggingLevel = rollbackLoggingLevel;
+ return this;
+ }
+
+ protected CamelLogger createLogger() {
+ return new CamelLogger(LoggerFactory.getLogger(TransactionErrorHandler.class), LoggingLevel.ERROR);
+ }
+
+ @Override
+ public String toString() {
+ return "JtaTransactionErrorHandlerBuilder";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
new file mode 100644
index 0000000..c4c70c2
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java
@@ -0,0 +1,136 @@
+package org.apache.camel.cdi.jta;
+
+import javax.annotation.Resource;
+import javax.transaction.TransactionManager;
+
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilderRef;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.TransactedPolicy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sets a proper error handler. This class is based on
+ * {@link org.apache.camel.spring.spi.SpringTransactionPolicy}.
+ * <p>
+ * This class requires the resource {@link TransactionManager} to be available
+ * through JNDI url "java:/TransactionManager"
+ */
+public abstract class JtaTransactionPolicy implements TransactedPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionPolicy.class);
+
+ public static interface Runnable {
+ void run() throws Throwable;
+ }
+
+ @Resource(lookup = "java:/TransactionManager")
+ protected TransactionManager transactionManager;
+
+ @Override
+ public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) {
+ // do not inherit since we create our own
+ // (otherwise the default error handler would be used two times
+ // because we inherit it on our own but only in case of a
+ // non-transactional
+ // error handler)
+ definition.setInheritErrorHandler(false);
+ }
+
+ public abstract void run(final Runnable runnable) throws Throwable;
+
+ @Override
+ public Processor wrap(RouteContext routeContext, Processor processor) {
+
+ JtaTransactionErrorHandler answer;
+
+ // the goal is to configure the error handler builder on the route as a
+ // transacted error handler,
+ // either its already a transacted or if not we replace it with a
+ // transacted one that we configure here
+ // and wrap the processor in the transacted error handler as we can have
+ // transacted routes that change
+ // propagation behavior, eg: from A required -> B -> requiresNew C
+ // (advanced use-case)
+ // if we should not support this we do not need to wrap the processor as
+ // we only need one transacted error handler
+
+ // find the existing error handler builder
+ ErrorHandlerBuilder builder = (ErrorHandlerBuilder) routeContext.getRoute().getErrorHandlerBuilder();
+
+ // check if its a ref if so then do a lookup
+ if (builder instanceof ErrorHandlerBuilderRef) {
+ // its a reference to a error handler so lookup the reference
+ ErrorHandlerBuilderRef builderRef = (ErrorHandlerBuilderRef) builder;
+ String ref = builderRef.getRef();
+ // only lookup if there was explicit an error handler builder
+ // configured
+ // otherwise its just the "default" that has not explicit been
+ // configured
+ // and if so then we can safely replace that with our transacted
+ // error handler
+ if (ErrorHandlerBuilderRef.isErrorHandlerBuilderConfigured(ref)) {
+ LOG.debug("Looking up ErrorHandlerBuilder with ref: {}", ref);
+ builder = (ErrorHandlerBuilder) ErrorHandlerBuilderRef.lookupErrorHandlerBuilder(routeContext, ref);
+ }
+ }
+
+ JtaTransactionErrorHandlerBuilder txBuilder;
+ if ((builder != null) && builder.supportTransacted()) {
+ if (!(builder instanceof JtaTransactionErrorHandlerBuilder)) {
+ throw new RuntimeCamelException("The given transactional error handler builder '" + builder
+ + "' is not of type '" + JtaTransactionErrorHandlerBuilder.class.getName()
+ + "' which is required in this environment!");
+ }
+ LOG.debug("The ErrorHandlerBuilder configured is a JtaTransactionErrorHandlerBuilder: {}", builder);
+ txBuilder = (JtaTransactionErrorHandlerBuilder) builder.cloneBuilder();
+ } else {
+ LOG.debug(
+ "No or no transactional ErrorHandlerBuilder configured, will use default JtaTransactionErrorHandlerBuilder settings");
+ txBuilder = new JtaTransactionErrorHandlerBuilder();
+ }
+
+ txBuilder.setTransactionPolicy(this);
+
+ // use error handlers from the configured builder
+ if (builder != null) {
+ txBuilder.setErrorHandlers(routeContext, builder.getErrorHandlers(routeContext));
+ }
+
+ answer = createTransactionErrorHandler(routeContext, processor, txBuilder);
+ answer.setExceptionPolicy(txBuilder.getExceptionPolicyStrategy());
+ // configure our answer based on the existing error handler
+ txBuilder.configure(routeContext, answer);
+
+ // set the route to use our transacted error handler builder
+ routeContext.getRoute().setErrorHandlerBuilder(txBuilder);
+
+ // return with wrapped transacted error handler
+ return answer;
+
+ }
+
+ protected JtaTransactionErrorHandler createTransactionErrorHandler(RouteContext routeContext, Processor processor,
+ ErrorHandlerBuilder builder) {
+
+ JtaTransactionErrorHandler answer;
+ try {
+ answer = (JtaTransactionErrorHandler) builder.createErrorHandler(routeContext, processor);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ return answer;
+
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
new file mode 100644
index 0000000..260ad69
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_MANDATORY")
+public class MandatoryJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Exception {
+
+ if (!hasActiveTransaction()) {
+ throw new IllegalStateException(
+ "Policy 'PROPAGATION_MANDATORY' is configured but no active transaction was found!");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
new file mode 100644
index 0000000..6ce116a
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java
@@ -0,0 +1,44 @@
+package org.apache.camel.cdi.jta;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_NESTED")
+public class NestedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ private static final Logger logger = Logger.getLogger(NestedJtaTransactionPolicy.class.getCanonicalName());
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ Transaction suspendedTransaction = null;
+ boolean rollback = false;
+ try {
+
+ suspendedTransaction = suspendTransaction();
+ runWithTransaction(runnable, true);
+
+ } catch (Throwable e) {
+ rollback = true;
+ throw e;
+ } finally {
+ try {
+ if (rollback) {
+ rollback(false);
+ }
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Could not do rollback of outer transaction", e);
+ }
+ try {
+ resumeTransaction(suspendedTransaction);
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Could not resume outer transaction", e);
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
new file mode 100644
index 0000000..377c856
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_NEVER")
+public class NeverJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Exception {
+
+ if (hasActiveTransaction()) {
+ throw new IllegalStateException(
+ "Policy 'PROPAGATION_NEVER' is configured but an active transaction was found!");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
new file mode 100644
index 0000000..c3c6bfc
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java
@@ -0,0 +1,24 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_NOT_SUPPORTED")
+public class NotSupportedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ Transaction suspendedTransaction = null;
+ try {
+
+ suspendedTransaction = suspendTransaction();
+ runnable.run();
+
+ } finally {
+ resumeTransaction(suspendedTransaction);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
new file mode 100644
index 0000000..b40dd80
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java
@@ -0,0 +1,15 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_REQUIRED")
+public class RequiredJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ runWithTransaction(runnable, !hasActiveTransaction());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
new file mode 100644
index 0000000..4b1fa47
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java
@@ -0,0 +1,24 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+import javax.transaction.Transaction;
+
+@Named("PROPAGATION_REQUIRES_NEW")
+public class RequiresNewJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ Transaction suspendedTransaction = null;
+ try {
+
+ suspendedTransaction = suspendTransaction();
+ runWithTransaction(runnable, true);
+
+ } finally {
+ resumeTransaction(suspendedTransaction);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
new file mode 100644
index 0000000..28ba016
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java
@@ -0,0 +1,15 @@
+package org.apache.camel.cdi.jta;
+
+import javax.inject.Named;
+
+@Named("PROPAGATION_SUPPORTS")
+public class SupportsJtaTransactionPolicy extends TransactionalJtaTransactionPolicy {
+
+ @Override
+ public void run(final Runnable runnable) throws Throwable {
+
+ runnable.run();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
new file mode 100644
index 0000000..9d01cbe
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java
@@ -0,0 +1,18 @@
+package org.apache.camel.cdi.jta;
+
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * Used to expose the method 'resolvePolicy' used by
+ * {@link JtaTransactionErrorHandlerBuilder} to resolve configured policy
+ * references.
+ */
+public class TransactedDefinition extends org.apache.camel.model.TransactedDefinition {
+
+ @Override
+ public Policy resolvePolicy(RouteContext routeContext) {
+ return super.resolvePolicy(routeContext);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
new file mode 100644
index 0000000..651074e
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java
@@ -0,0 +1,370 @@
+package org.apache.camel.cdi.jta;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.transaction.TransactionRolledbackException;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.ErrorHandlerSupport;
+import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.spi.ShutdownPrepared;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * Does transactional execution according given policy. This class is based on
+ * {@link org.apache.camel.spring.spi.TransactionErrorHandler} excluding
+ * redelivery functionality. In the Spring implementation redelivering is done
+ * within the transaction which is not appropriate in JTA since every error
+ * breaks the current transaction.
+ */
+public class TransactionErrorHandler extends ErrorHandlerSupport
+ implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
+
+ protected final Processor output;
+ protected volatile boolean preparingShutdown;
+
+ private ExceptionPolicyStrategy exceptionPolicy;
+
+ private JtaTransactionPolicy transactionPolicy;
+
+ private final String transactionKey;
+
+ private final LoggingLevel rollbackLoggingLevel;
+
+ /**
+ * Creates the transaction error handler.
+ *
+ * @param camelContext
+ * the camel context
+ * @param output
+ * outer processor that should use this default error handler
+ * @param exceptionPolicyStrategy
+ * strategy for onException handling
+ * @param transactionPolicy
+ * the transaction policy
+ * @param executorService
+ * the {@link java.util.concurrent.ScheduledExecutorService} to
+ * be used for redelivery thread pool. Can be <tt>null</tt>.
+ * @param rollbackLoggingLevel
+ * logging level to use for logging transaction rollback occurred
+ */
+ public TransactionErrorHandler(CamelContext camelContext, Processor output,
+ ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy,
+ ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel) {
+
+ this.output = output;
+ this.transactionPolicy = transactionPolicy;
+ this.rollbackLoggingLevel = rollbackLoggingLevel;
+ this.transactionKey = ObjectHelper.getIdentityHashCode(transactionPolicy);
+
+ setExceptionPolicy(exceptionPolicyStrategy);
+
+ }
+
+ public void process(Exchange exchange) throws Exception {
+
+ // we have to run this synchronously as a JTA Transaction does *not*
+ // support using multiple threads to span a transaction
+ if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
+ // already transacted by this transaction template
+ // so lets just let the error handler process it
+ processByErrorHandler(exchange);
+ } else {
+ // not yet wrapped in transaction so lets do that
+ // and then have it invoke the error handler from within that
+ // transaction
+ processInTransaction(exchange);
+ }
+
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+
+ // invoke this synchronous method as JTA Transaction does *not*
+ // support using multiple threads to span a transaction
+ try {
+ process(exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+
+ // notify callback we are done synchronously
+ callback.done(true);
+ return true;
+
+ }
+
+ protected void processInTransaction(final Exchange exchange) throws Exception {
+ // is the exchange redelivered, for example JMS brokers support such
+ // details
+ Boolean externalRedelivered = exchange.isExternalRedelivered();
+ final String redelivered = externalRedelivered != null ? externalRedelivered.toString() : "unknown";
+ final String ids = ExchangeHelper.logIds(exchange);
+
+ try {
+ // mark the beginning of this transaction boundary
+ exchange.getUnitOfWork().beginTransactedBy(transactionKey);
+
+ // do in transaction
+ logTransactionBegin(redelivered, ids);
+ doInTransactionTemplate(exchange);
+ logTransactionCommit(redelivered, ids);
+
+ } catch (TransactionRolledbackException e) {
+ // do not set as exception, as its just a dummy exception to force
+ // spring TX to rollback
+ logTransactionRollback(redelivered, ids, null, true);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ logTransactionRollback(redelivered, ids, e, false);
+ } finally {
+ // mark the end of this transaction boundary
+ exchange.getUnitOfWork().endTransactedBy(transactionKey);
+ }
+
+ // if it was a local rollback only then remove its marker so outer
+ // transaction wont see the marker
+ Boolean onlyLast = (Boolean) exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST);
+ if (onlyLast != null && onlyLast) {
+ // we only want this logged at debug level
+ if (log.isDebugEnabled()) {
+ // log exception if there was a cause exception so we have the
+ // stack trace
+ Exception cause = exchange.getException();
+ if (cause != null) {
+ log.debug("Transaction rollback (" + transactionKey + ") redelivered(" + redelivered + ") for "
+ + ids + " due exchange was marked for rollbackOnlyLast and caught: ", cause);
+ } else {
+ log.debug(
+ "Transaction rollback ({}) redelivered({}) for {} "
+ + "due exchange was marked for rollbackOnlyLast",
+ new Object[] { transactionKey, redelivered, ids });
+ }
+ }
+ // remove caused exception due we was marked as rollback only last
+ // so by removing the exception, any outer transaction will not be
+ // affected
+ exchange.setException(null);
+ }
+ }
+
+ public void setTransactionPolicy(JtaTransactionPolicy transactionPolicy) {
+ this.transactionPolicy = transactionPolicy;
+ }
+
+ protected void doInTransactionTemplate(final Exchange exchange) throws Throwable {
+
+ // spring transaction template is working best with rollback if you
+ // throw it a runtime exception
+ // otherwise it may not rollback messages send to JMS queues etc.
+ transactionPolicy.run(new JtaTransactionPolicy.Runnable() {
+
+ @Override
+ public void run() throws Throwable {
+
+ // wrapper exception to throw if the exchange failed
+ // IMPORTANT: Must be a runtime exception to let Spring regard
+ // it as to do "rollback"
+ Throwable rce;
+
+ // and now let process the exchange by the error handler
+ processByErrorHandler(exchange);
+
+ // after handling and still an exception or marked as rollback
+ // only then rollback
+ if (exchange.getException() != null || exchange.isRollbackOnly()) {
+
+ // wrap exception in transacted exception
+ if (exchange.getException() != null) {
+ rce = exchange.getException();
+ } else {
+ // create dummy exception to force spring transaction
+ // manager to rollback
+ rce = new TransactionRolledbackException();
+ }
+
+ // throw runtime exception to force rollback (which works
+ // best to rollback with Spring transaction manager)
+ if (log.isTraceEnabled()) {
+ log.trace("Throwing runtime exception to force transaction to rollback on {}",
+ transactionPolicy);
+ }
+ throw rce;
+ }
+ }
+
+ });
+
+ }
+
+ /**
+ * Processes the {@link Exchange} using the error handler.
+ * <p/>
+ * This implementation will invoke ensure this occurs synchronously, that
+ * means if the async routing engine did kick in, then this implementation
+ * will wait for the task to complete before it continues.
+ *
+ * @param exchange
+ * the exchange
+ */
+ protected void processByErrorHandler(final Exchange exchange) {
+
+ try {
+ output.process(exchange);
+ } catch (Throwable e) {
+ throw new RuntimeCamelException(e);
+ }
+
+ }
+
+ /**
+ * Logs the transaction begin
+ */
+ private void logTransactionBegin(String redelivered, String ids) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transaction begin ({}) redelivered({}) for {})",
+ new Object[] { transactionKey, redelivered, ids });
+ }
+ }
+
+ /**
+ * Logs the transaction commit
+ */
+ private void logTransactionCommit(String redelivered, String ids) {
+ if ("true".equals(redelivered)) {
+ // okay its a redelivered message so log at INFO level if
+ // rollbackLoggingLevel is INFO or higher
+ // this allows people to know that the redelivered message was
+ // committed this time
+ if (rollbackLoggingLevel == LoggingLevel.INFO || rollbackLoggingLevel == LoggingLevel.WARN
+ || rollbackLoggingLevel == LoggingLevel.ERROR) {
+ log.info("Transaction commit ({}) redelivered({}) for {})",
+ new Object[] { transactionKey, redelivered, ids });
+ // return after we have logged
+ return;
+ }
+ }
+
+ // log non redelivered by default at DEBUG level
+ log.debug("Transaction commit ({}) redelivered({}) for {})", new Object[] { transactionKey, redelivered, ids });
+ }
+
+ /**
+ * Logs the transaction rollback.
+ */
+ private void logTransactionRollback(String redelivered, String ids, Throwable e, boolean rollbackOnly) {
+ if (rollbackLoggingLevel == LoggingLevel.OFF) {
+ return;
+ } else if (rollbackLoggingLevel == LoggingLevel.ERROR && log.isErrorEnabled()) {
+ if (rollbackOnly) {
+ log.error("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.error("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ } else if (rollbackLoggingLevel == LoggingLevel.WARN && log.isWarnEnabled()) {
+ if (rollbackOnly) {
+ log.warn("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.warn("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ } else if (rollbackLoggingLevel == LoggingLevel.INFO && log.isInfoEnabled()) {
+ if (rollbackOnly) {
+ log.info("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.info("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ } else if (rollbackLoggingLevel == LoggingLevel.DEBUG && log.isDebugEnabled()) {
+ if (rollbackOnly) {
+ log.debug("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.debug("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ } else if (rollbackLoggingLevel == LoggingLevel.TRACE && log.isTraceEnabled()) {
+ if (rollbackOnly) {
+ log.trace("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly",
+ new Object[] { transactionKey, redelivered, ids });
+ } else {
+ log.trace("Transaction rollback ({}) redelivered({}) for {} caught: {}",
+ new Object[] { transactionKey, redelivered, ids, e.getMessage() });
+ }
+ }
+ }
+
+ public void setExceptionPolicy(ExceptionPolicyStrategy exceptionPolicy) {
+ this.exceptionPolicy = exceptionPolicy;
+ }
+
+ public ExceptionPolicyStrategy getExceptionPolicy() {
+ return exceptionPolicy;
+ }
+
+ @Override
+ public Processor getOutput() {
+ return output;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(output);
+ preparingShutdown = false;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop, do not stop any services which we only do when shutting down
+ // as the error handler can be context scoped, and should not stop in
+ // case
+ // a route stops
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(output);
+ }
+
+ @Override
+ public boolean supportTransacted() {
+ return true;
+ }
+
+ public boolean hasNext() {
+ return output != null;
+ }
+
+ @Override
+ public List<Processor> next() {
+ if (!hasNext()) {
+ return null;
+ }
+ List<Processor> answer = new ArrayList<Processor>(1);
+ answer.add(output);
+ return answer;
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // prepare for shutdown, eg do not allow redelivery if configured
+ log.trace("Prepare shutdown on error handler {}", this);
+ preparingShutdown = true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
new file mode 100644
index 0000000..4d17f8a
--- /dev/null
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java
@@ -0,0 +1,121 @@
+package org.apache.camel.cdi.jta;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
+import org.apache.camel.CamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper methods for transaction handling
+ */
+public abstract class TransactionalJtaTransactionPolicy extends JtaTransactionPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionalJtaTransactionPolicy.class);
+
+ protected void runWithTransaction(final Runnable runnable, final boolean isNew) throws Throwable {
+
+ if (isNew) {
+ begin();
+ }
+ try {
+ runnable.run();
+ } catch (RuntimeException e) {
+ rollback(isNew);
+ throw e;
+ } catch (Error e) {
+ rollback(isNew);
+ throw e;
+ } catch (Throwable e) {
+ rollback(isNew);
+ throw e;
+ }
+ if (isNew) {
+ commit();
+ }
+ return;
+
+ }
+
+ private void begin() throws Exception {
+
+ transactionManager.begin();
+
+ }
+
+ private void commit() throws Exception {
+
+ try {
+ transactionManager.commit();
+ } catch (HeuristicMixedException e) {
+ throw new CamelException("Unable to commit transaction", e);
+ } catch (HeuristicRollbackException e) {
+ throw new CamelException("Unable to commit transaction", e);
+ } catch (RollbackException e) {
+ throw new CamelException("Unable to commit transaction", e);
+ } catch (SystemException e) {
+ throw new CamelException("Unable to commit transaction", e);
+ } catch (RuntimeException e) {
+ rollback(true);
+ throw e;
+ } catch (Exception e) {
+ rollback(true);
+ throw e;
+ } catch (Error e) {
+ rollback(true);
+ throw e;
+ }
+
+ }
+
+ protected void rollback(boolean isNew) throws Exception {
+
+ try {
+
+ if (isNew) {
+ transactionManager.rollback();
+ } else {
+ transactionManager.setRollbackOnly();
+ }
+
+ } catch (Throwable e) {
+
+ LOG.warn("Could not rollback transaction!", e);
+
+ }
+
+ }
+
+ protected Transaction suspendTransaction() throws Exception {
+
+ return transactionManager.suspend();
+
+ }
+
+ protected void resumeTransaction(final Transaction suspendedTransaction) {
+
+ if (suspendedTransaction == null) {
+ return;
+ }
+
+ try {
+ transactionManager.resume(suspendedTransaction);
+ } catch (Throwable e) {
+ LOG.warn("Could not resume transaction!", e);
+ }
+
+ }
+
+ protected boolean hasActiveTransaction() throws Exception {
+
+ return transactionManager.getStatus() != Status.STATUS_MARKED_ROLLBACK
+ && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index df10dde..09719b1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -105,6 +105,7 @@
<cdi-api-1.1-version>1.1</cdi-api-1.1-version>
<cdi-api-1.2-version>1.2</cdi-api-1.2-version>
<cdi-api-2.0-version>2.0-PFD2</cdi-api-2.0-version>
+ <jta-api-1.2-version>1.2</jta-api-1.2-version>
<cglib-bundle-version>3.2.4_1</cglib-bundle-version>
<cglib-version>3.2.4</cglib-version>
<chronicle-bytes-version>1.7.35</chronicle-bytes-version>