You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/02 19:24:22 UTC

[3/3] camel git commit: CAMEL-8424 Fixed transaction propagated when requires new

CAMEL-8424 Fixed transaction propagated when requires new


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3fdb0b1b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3fdb0b1b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3fdb0b1b

Branch: refs/heads/camel-2.13.x
Commit: 3fdb0b1bff96c1506b9f5bd6aabbe583b7a92f9f
Parents: ae1dcaa
Author: Piotr Klimczak <Kl...@quindell.com>
Authored: Mon Mar 2 17:49:49 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 2 19:25:25 2015 +0100

----------------------------------------------------------------------
 .../spring/spi/TransactionErrorHandler.java     |   2 +-
 .../camel/itest/tx/Jms2RequiresNewTest.java     | 118 +++++++++++++++++++
 .../itest/tx/Jms2RequiresNewTest-context.xml    |  79 +++++++++++++
 3 files changed, 198 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3fdb0b1b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
index fba08af..4898997 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
@@ -93,7 +93,7 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler {
     public void process(Exchange exchange) throws Exception {
         // we have to run this synchronously as Spring Transaction does *not* support
         // using multiple threads to span a transaction
-        if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
+        if (transactionTemplate.getPropagationBehavior() != TransactionDefinition.PROPAGATION_REQUIRES_NEW && exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
             // already transacted by this transaction template
             // so lets just let the error handler process it
             processByErrorHandler(exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/3fdb0b1b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
new file mode 100644
index 0000000..c320cc8
--- /dev/null
+++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.tx;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
+
+/**
+ * Unit test will look for the spring .xml file with the same class name
+ * but postfixed with -config.xml as filename.
+ * <p/>
+ * We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
+ * that is a Spring class.
+ * 
+ * @version 
+ */
+@ContextConfiguration
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+public class Jms2RequiresNewTest extends AbstractJUnit4SpringContextTests {
+
+    @Autowired
+    private CamelContext camelContext;
+
+    @EndpointInject(uri = "mock:result1")
+    private MockEndpoint result1;
+
+    @EndpointInject(uri = "mock:result2")
+    private MockEndpoint result2;
+
+    @EndpointInject(uri = "mock:dlq")
+    private MockEndpoint dlq;
+
+    @EndpointInject(uri = "direct:start")
+    private ProducerTemplate start;
+
+    @Before
+    public void setUpRoute() throws Exception {
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                        .markRollbackOnly();
+
+                from("direct:start").transacted("PROPAGATION_REQUIRES_NEW").to("activemq:queue:start");
+                from("activemq:queue:result1").transacted("PROPAGATION_REQUIRES_NEW").to("mock:result1");
+                from("activemq:queue:result2").transacted("PROPAGATION_REQUIRES_NEW").to("mock:result2");
+                from("activemq:queue:ActiveMQ.DLQ").transacted("PROPAGATION_REQUIRES_NEW").to("mock:dlq");
+
+                from("activemq:queue:start")
+                        .transacted("PROPAGATION_REQUIRES_NEW")
+                        .setExchangePattern(ExchangePattern.InOnly)
+                        .to("activemq:queue:result1")
+                        .to("direct:route2")
+                        .choice()
+                            .when(body().contains("Neverland"))
+                                .throwException(new RuntimeException("Expected!"));
+
+                from("direct:route2")
+                        .transacted("PROPAGATION_REQUIRES_NEW")
+                        .setExchangePattern(ExchangePattern.InOnly)
+                        .to("activemq:queue:result2");
+
+            }
+        });
+    }
+
+    @Test
+    public void testSendThrowingException() throws Exception {
+        result1.expectedMessageCount(0);
+        result2.expectedMessageCount(1);
+        dlq.expectedMessageCount(1);
+
+        start.sendBody("Single ticket to Neverland please!");
+
+        result2.assertIsSatisfied();
+        dlq.assertIsSatisfied();
+        result1.assertIsSatisfied();
+    }
+
+    @Test
+    public void testSend() throws Exception {
+        result1.expectedMessageCount(1);
+        result2.expectedMessageCount(1);
+        dlq.expectedMessageCount(0);
+
+        start.sendBody("Piotr Klimczak");
+
+        result1.assertIsSatisfied();
+        result2.assertIsSatisfied();
+        dlq.assertIsSatisfied();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3fdb0b1b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
new file mode 100644
index 0000000..7baa7c1
--- /dev/null
+++ b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:broker="http://activemq.apache.org/schema/core"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    <!-- use required TX -->
+    <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+        <property name="transactionManager" ref="jtaTransactionManager"/>
+        <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
+    </bean>
+
+    <!-- setup Atomikos for XA transaction -->
+    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="my-broker">
+        <!-- when close is called, should we force transactions to terminate or not? -->
+        <property name="forceShutdown" value="false"/>
+    </bean>
+
+    <!-- this is some atomikos setup you must do -->
+    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="my-broker">
+        <property name="transactionTimeout" value="300"/>
+    </bean>
+
+    <!-- this is some atomikos setup you must do -->
+    <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close" depends-on="my-broker">
+        <property name="uniqueResourceName" value="myUniqueResource"/>
+        <property name="xaConnectionFactory" ref="jmsXaConnectionFactory"/>
+    </bean>
+
+    <!-- this is the Spring JtaTransactionManager which under the hood uses Atomikos -->
+    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="my-broker">
+        <property name="transactionManager" ref="atomikosTransactionManager"/>
+        <property name="userTransaction" ref="atomikosUserTransaction"/>
+    </bean>
+
+    <!-- Is the ConnectionFactory to connect to the JMS broker -->
+    <!-- notice how we must use the XA connection factory -->
+    <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
+        <property name="brokerURL" value="tcp://localhost:61616"/>
+    </bean>
+
+    <!-- define the activemq Camel component so we can integrate with the AMQ broker below -->
+    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" depends-on="my-broker">
+        <!-- must indicate that we use transacted acknowledge mode -->
+        <property name="transacted" value="true"/>
+        <!-- refer to the transaction manager -->
+        <property name="transactionManager" ref="jtaTransactionManager"/>
+    </bean>
+
+    <!-- setup a local JMS Broker for testing purpose -->
+    <broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost" xmlns="http://activemq.apache.org/schema/core">
+        <transportConnectors>
+            <transportConnector uri="tcp://localhost:61616"/>
+        </transportConnectors>
+    </broker>
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+    </camelContext>
+
+</beans>