You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/02 19:24:20 UTC
[1/3] camel git commit: CAMEL-8424 Fixed transaction propagated when
requires new
Repository: camel
Updated Branches:
refs/heads/camel-2.13.x ae1dcaa8c -> 3fdb0b1bf
refs/heads/camel-2.14.x d74a76364 -> 67f12ca0f
refs/heads/master b23c7aa6e -> 934d0f14a
CAMEL-8424 Fixed transaction propagated when requires new
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/934d0f14
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/934d0f14
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/934d0f14
Branch: refs/heads/master
Commit: 934d0f14a0d4a733d5fdf2310a21de66ebb77626
Parents: b23c7aa
Author: Piotr Klimczak <Kl...@quindell.com>
Authored: Mon Mar 2 17:49:49 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 2 18:55:52 2015 +0100
----------------------------------------------------------------------
.../spring/spi/TransactionErrorHandler.java | 2 +-
.../camel/itest/tx/Jms2RequiresNewTest.java | 118 +++++++++++++++++++
.../itest/tx/Jms2RequiresNewTest-context.xml | 79 +++++++++++++
3 files changed, 198 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/934d0f14/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
index a27a73f..27a7afe 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
@@ -93,7 +93,7 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler {
public void process(Exchange exchange) throws Exception {
// we have to run this synchronously as Spring Transaction does *not* support
// using multiple threads to span a transaction
- if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
+ if (transactionTemplate.getPropagationBehavior() != TransactionDefinition.PROPAGATION_REQUIRES_NEW && exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
// already transacted by this transaction template
// so lets just let the error handler process it
processByErrorHandler(exchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/934d0f14/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
new file mode 100644
index 0000000..c320cc8
--- /dev/null
+++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.tx;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
+
+/**
+ * Unit test will look for the spring .xml file with the same class name
+ * but postfixed with -config.xml as filename.
+ * <p/>
+ * We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
+ * that is a Spring class.
+ *
+ * @version
+ */
+@ContextConfiguration
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+public class Jms2RequiresNewTest extends AbstractJUnit4SpringContextTests {
+
+ @Autowired
+ private CamelContext camelContext;
+
+ @EndpointInject(uri = "mock:result1")
+ private MockEndpoint result1;
+
+ @EndpointInject(uri = "mock:result2")
+ private MockEndpoint result2;
+
+ @EndpointInject(uri = "mock:dlq")
+ private MockEndpoint dlq;
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate start;
+
+ @Before
+ public void setUpRoute() throws Exception {
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class)
+ .markRollbackOnly();
+
+ from("direct:start").transacted("PROPAGATION_REQUIRES_NEW").to("activemq:queue:start");
+ from("activemq:queue:result1").transacted("PROPAGATION_REQUIRES_NEW").to("mock:result1");
+ from("activemq:queue:result2").transacted("PROPAGATION_REQUIRES_NEW").to("mock:result2");
+ from("activemq:queue:ActiveMQ.DLQ").transacted("PROPAGATION_REQUIRES_NEW").to("mock:dlq");
+
+ from("activemq:queue:start")
+ .transacted("PROPAGATION_REQUIRES_NEW")
+ .setExchangePattern(ExchangePattern.InOnly)
+ .to("activemq:queue:result1")
+ .to("direct:route2")
+ .choice()
+ .when(body().contains("Neverland"))
+ .throwException(new RuntimeException("Expected!"));
+
+ from("direct:route2")
+ .transacted("PROPAGATION_REQUIRES_NEW")
+ .setExchangePattern(ExchangePattern.InOnly)
+ .to("activemq:queue:result2");
+
+ }
+ });
+ }
+
+ @Test
+ public void testSendThrowingException() throws Exception {
+ result1.expectedMessageCount(0);
+ result2.expectedMessageCount(1);
+ dlq.expectedMessageCount(1);
+
+ start.sendBody("Single ticket to Neverland please!");
+
+ result2.assertIsSatisfied();
+ dlq.assertIsSatisfied();
+ result1.assertIsSatisfied();
+ }
+
+ @Test
+ public void testSend() throws Exception {
+ result1.expectedMessageCount(1);
+ result2.expectedMessageCount(1);
+ dlq.expectedMessageCount(0);
+
+ start.sendBody("Piotr Klimczak");
+
+ result1.assertIsSatisfied();
+ result2.assertIsSatisfied();
+ dlq.assertIsSatisfied();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/934d0f14/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
new file mode 100644
index 0000000..7baa7c1
--- /dev/null
+++ b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:broker="http://activemq.apache.org/schema/core"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!-- use required TX -->
+ <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+ <property name="transactionManager" ref="jtaTransactionManager"/>
+ <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
+ </bean>
+
+ <!-- setup Atomikos for XA transaction -->
+ <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="my-broker">
+ <!-- when close is called, should we force transactions to terminate or not? -->
+ <property name="forceShutdown" value="false"/>
+ </bean>
+
+ <!-- this is some atomikos setup you must do -->
+ <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="my-broker">
+ <property name="transactionTimeout" value="300"/>
+ </bean>
+
+ <!-- this is some atomikos setup you must do -->
+ <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close" depends-on="my-broker">
+ <property name="uniqueResourceName" value="myUniqueResource"/>
+ <property name="xaConnectionFactory" ref="jmsXaConnectionFactory"/>
+ </bean>
+
+ <!-- this is the Spring JtaTransactionManager which under the hood uses Atomikos -->
+ <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="my-broker">
+ <property name="transactionManager" ref="atomikosTransactionManager"/>
+ <property name="userTransaction" ref="atomikosUserTransaction"/>
+ </bean>
+
+ <!-- Is the ConnectionFactory to connect to the JMS broker -->
+ <!-- notice how we must use the XA connection factory -->
+ <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
+ <property name="brokerURL" value="tcp://localhost:61616"/>
+ </bean>
+
+ <!-- define the activemq Camel component so we can integrate with the AMQ broker below -->
+ <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" depends-on="my-broker">
+ <!-- must indicate that we use transacted acknowledge mode -->
+ <property name="transacted" value="true"/>
+ <!-- refer to the transaction manager -->
+ <property name="transactionManager" ref="jtaTransactionManager"/>
+ </bean>
+
+ <!-- setup a local JMS Broker for testing purpose -->
+ <broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost" xmlns="http://activemq.apache.org/schema/core">
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61616"/>
+ </transportConnectors>
+ </broker>
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ </camelContext>
+
+</beans>
[3/3] camel git commit: CAMEL-8424 Fixed transaction propagated when
requires new
Posted by da...@apache.org.
CAMEL-8424 Fixed transaction propagated when requires new
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3fdb0b1b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3fdb0b1b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3fdb0b1b
Branch: refs/heads/camel-2.13.x
Commit: 3fdb0b1bff96c1506b9f5bd6aabbe583b7a92f9f
Parents: ae1dcaa
Author: Piotr Klimczak <Kl...@quindell.com>
Authored: Mon Mar 2 17:49:49 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 2 19:25:25 2015 +0100
----------------------------------------------------------------------
.../spring/spi/TransactionErrorHandler.java | 2 +-
.../camel/itest/tx/Jms2RequiresNewTest.java | 118 +++++++++++++++++++
.../itest/tx/Jms2RequiresNewTest-context.xml | 79 +++++++++++++
3 files changed, 198 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3fdb0b1b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
index fba08af..4898997 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
@@ -93,7 +93,7 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler {
public void process(Exchange exchange) throws Exception {
// we have to run this synchronously as Spring Transaction does *not* support
// using multiple threads to span a transaction
- if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
+ if (transactionTemplate.getPropagationBehavior() != TransactionDefinition.PROPAGATION_REQUIRES_NEW && exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
// already transacted by this transaction template
// so lets just let the error handler process it
processByErrorHandler(exchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/3fdb0b1b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
new file mode 100644
index 0000000..c320cc8
--- /dev/null
+++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.tx;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
+
+/**
+ * Unit test will look for the spring .xml file with the same class name
+ * but postfixed with -config.xml as filename.
+ * <p/>
+ * We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
+ * that is a Spring class.
+ *
+ * @version
+ */
+@ContextConfiguration
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+public class Jms2RequiresNewTest extends AbstractJUnit4SpringContextTests {
+
+ @Autowired
+ private CamelContext camelContext;
+
+ @EndpointInject(uri = "mock:result1")
+ private MockEndpoint result1;
+
+ @EndpointInject(uri = "mock:result2")
+ private MockEndpoint result2;
+
+ @EndpointInject(uri = "mock:dlq")
+ private MockEndpoint dlq;
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate start;
+
+ @Before
+ public void setUpRoute() throws Exception {
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class)
+ .markRollbackOnly();
+
+ from("direct:start").transacted("PROPAGATION_REQUIRES_NEW").to("activemq:queue:start");
+ from("activemq:queue:result1").transacted("PROPAGATION_REQUIRES_NEW").to("mock:result1");
+ from("activemq:queue:result2").transacted("PROPAGATION_REQUIRES_NEW").to("mock:result2");
+ from("activemq:queue:ActiveMQ.DLQ").transacted("PROPAGATION_REQUIRES_NEW").to("mock:dlq");
+
+ from("activemq:queue:start")
+ .transacted("PROPAGATION_REQUIRES_NEW")
+ .setExchangePattern(ExchangePattern.InOnly)
+ .to("activemq:queue:result1")
+ .to("direct:route2")
+ .choice()
+ .when(body().contains("Neverland"))
+ .throwException(new RuntimeException("Expected!"));
+
+ from("direct:route2")
+ .transacted("PROPAGATION_REQUIRES_NEW")
+ .setExchangePattern(ExchangePattern.InOnly)
+ .to("activemq:queue:result2");
+
+ }
+ });
+ }
+
+ @Test
+ public void testSendThrowingException() throws Exception {
+ result1.expectedMessageCount(0);
+ result2.expectedMessageCount(1);
+ dlq.expectedMessageCount(1);
+
+ start.sendBody("Single ticket to Neverland please!");
+
+ result2.assertIsSatisfied();
+ dlq.assertIsSatisfied();
+ result1.assertIsSatisfied();
+ }
+
+ @Test
+ public void testSend() throws Exception {
+ result1.expectedMessageCount(1);
+ result2.expectedMessageCount(1);
+ dlq.expectedMessageCount(0);
+
+ start.sendBody("Piotr Klimczak");
+
+ result1.assertIsSatisfied();
+ result2.assertIsSatisfied();
+ dlq.assertIsSatisfied();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3fdb0b1b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
new file mode 100644
index 0000000..7baa7c1
--- /dev/null
+++ b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:broker="http://activemq.apache.org/schema/core"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!-- use required TX -->
+ <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+ <property name="transactionManager" ref="jtaTransactionManager"/>
+ <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
+ </bean>
+
+ <!-- setup Atomikos for XA transaction -->
+ <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="my-broker">
+ <!-- when close is called, should we force transactions to terminate or not? -->
+ <property name="forceShutdown" value="false"/>
+ </bean>
+
+ <!-- this is some atomikos setup you must do -->
+ <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="my-broker">
+ <property name="transactionTimeout" value="300"/>
+ </bean>
+
+ <!-- this is some atomikos setup you must do -->
+ <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close" depends-on="my-broker">
+ <property name="uniqueResourceName" value="myUniqueResource"/>
+ <property name="xaConnectionFactory" ref="jmsXaConnectionFactory"/>
+ </bean>
+
+ <!-- this is the Spring JtaTransactionManager which under the hood uses Atomikos -->
+ <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="my-broker">
+ <property name="transactionManager" ref="atomikosTransactionManager"/>
+ <property name="userTransaction" ref="atomikosUserTransaction"/>
+ </bean>
+
+ <!-- Is the ConnectionFactory to connect to the JMS broker -->
+ <!-- notice how we must use the XA connection factory -->
+ <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
+ <property name="brokerURL" value="tcp://localhost:61616"/>
+ </bean>
+
+ <!-- define the activemq Camel component so we can integrate with the AMQ broker below -->
+ <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" depends-on="my-broker">
+ <!-- must indicate that we use transacted acknowledge mode -->
+ <property name="transacted" value="true"/>
+ <!-- refer to the transaction manager -->
+ <property name="transactionManager" ref="jtaTransactionManager"/>
+ </bean>
+
+ <!-- setup a local JMS Broker for testing purpose -->
+ <broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost" xmlns="http://activemq.apache.org/schema/core">
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61616"/>
+ </transportConnectors>
+ </broker>
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ </camelContext>
+
+</beans>
[2/3] camel git commit: CAMEL-8424 Fixed transaction propagated when
requires new
Posted by da...@apache.org.
CAMEL-8424 Fixed transaction propagated when requires new
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/67f12ca0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/67f12ca0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/67f12ca0
Branch: refs/heads/camel-2.14.x
Commit: 67f12ca0fce2445538281b6846039829b664cff1
Parents: d74a763
Author: Piotr Klimczak <Kl...@quindell.com>
Authored: Mon Mar 2 17:49:49 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 2 19:25:13 2015 +0100
----------------------------------------------------------------------
.../spring/spi/TransactionErrorHandler.java | 2 +-
.../camel/itest/tx/Jms2RequiresNewTest.java | 118 +++++++++++++++++++
.../itest/tx/Jms2RequiresNewTest-context.xml | 79 +++++++++++++
3 files changed, 198 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/67f12ca0/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
index fba08af..4898997 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
@@ -93,7 +93,7 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler {
public void process(Exchange exchange) throws Exception {
// we have to run this synchronously as Spring Transaction does *not* support
// using multiple threads to span a transaction
- if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
+ if (transactionTemplate.getPropagationBehavior() != TransactionDefinition.PROPAGATION_REQUIRES_NEW && exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
// already transacted by this transaction template
// so lets just let the error handler process it
processByErrorHandler(exchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/67f12ca0/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
new file mode 100644
index 0000000..c320cc8
--- /dev/null
+++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/Jms2RequiresNewTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.tx;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
+
+/**
+ * Unit test will look for the spring .xml file with the same class name
+ * but postfixed with -config.xml as filename.
+ * <p/>
+ * We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
+ * that is a Spring class.
+ *
+ * @version
+ */
+@ContextConfiguration
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+public class Jms2RequiresNewTest extends AbstractJUnit4SpringContextTests {
+
+ @Autowired
+ private CamelContext camelContext;
+
+ @EndpointInject(uri = "mock:result1")
+ private MockEndpoint result1;
+
+ @EndpointInject(uri = "mock:result2")
+ private MockEndpoint result2;
+
+ @EndpointInject(uri = "mock:dlq")
+ private MockEndpoint dlq;
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate start;
+
+ @Before
+ public void setUpRoute() throws Exception {
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class)
+ .markRollbackOnly();
+
+ from("direct:start").transacted("PROPAGATION_REQUIRES_NEW").to("activemq:queue:start");
+ from("activemq:queue:result1").transacted("PROPAGATION_REQUIRES_NEW").to("mock:result1");
+ from("activemq:queue:result2").transacted("PROPAGATION_REQUIRES_NEW").to("mock:result2");
+ from("activemq:queue:ActiveMQ.DLQ").transacted("PROPAGATION_REQUIRES_NEW").to("mock:dlq");
+
+ from("activemq:queue:start")
+ .transacted("PROPAGATION_REQUIRES_NEW")
+ .setExchangePattern(ExchangePattern.InOnly)
+ .to("activemq:queue:result1")
+ .to("direct:route2")
+ .choice()
+ .when(body().contains("Neverland"))
+ .throwException(new RuntimeException("Expected!"));
+
+ from("direct:route2")
+ .transacted("PROPAGATION_REQUIRES_NEW")
+ .setExchangePattern(ExchangePattern.InOnly)
+ .to("activemq:queue:result2");
+
+ }
+ });
+ }
+
+ @Test
+ public void testSendThrowingException() throws Exception {
+ result1.expectedMessageCount(0);
+ result2.expectedMessageCount(1);
+ dlq.expectedMessageCount(1);
+
+ start.sendBody("Single ticket to Neverland please!");
+
+ result2.assertIsSatisfied();
+ dlq.assertIsSatisfied();
+ result1.assertIsSatisfied();
+ }
+
+ @Test
+ public void testSend() throws Exception {
+ result1.expectedMessageCount(1);
+ result2.expectedMessageCount(1);
+ dlq.expectedMessageCount(0);
+
+ start.sendBody("Piotr Klimczak");
+
+ result1.assertIsSatisfied();
+ result2.assertIsSatisfied();
+ dlq.assertIsSatisfied();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/67f12ca0/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
new file mode 100644
index 0000000..7baa7c1
--- /dev/null
+++ b/tests/camel-itest/src/test/resources/org/apache/camel/itest/tx/Jms2RequiresNewTest-context.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:broker="http://activemq.apache.org/schema/core"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!-- use required TX -->
+ <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+ <property name="transactionManager" ref="jtaTransactionManager"/>
+ <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
+ </bean>
+
+ <!-- setup Atomikos for XA transaction -->
+ <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="my-broker">
+ <!-- when close is called, should we force transactions to terminate or not? -->
+ <property name="forceShutdown" value="false"/>
+ </bean>
+
+ <!-- this is some atomikos setup you must do -->
+ <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="my-broker">
+ <property name="transactionTimeout" value="300"/>
+ </bean>
+
+ <!-- this is some atomikos setup you must do -->
+ <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close" depends-on="my-broker">
+ <property name="uniqueResourceName" value="myUniqueResource"/>
+ <property name="xaConnectionFactory" ref="jmsXaConnectionFactory"/>
+ </bean>
+
+ <!-- this is the Spring JtaTransactionManager which under the hood uses Atomikos -->
+ <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="my-broker">
+ <property name="transactionManager" ref="atomikosTransactionManager"/>
+ <property name="userTransaction" ref="atomikosUserTransaction"/>
+ </bean>
+
+ <!-- Is the ConnectionFactory to connect to the JMS broker -->
+ <!-- notice how we must use the XA connection factory -->
+ <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
+ <property name="brokerURL" value="tcp://localhost:61616"/>
+ </bean>
+
+ <!-- define the activemq Camel component so we can integrate with the AMQ broker below -->
+ <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" depends-on="my-broker">
+ <!-- must indicate that we use transacted acknowledge mode -->
+ <property name="transacted" value="true"/>
+ <!-- refer to the transaction manager -->
+ <property name="transactionManager" ref="jtaTransactionManager"/>
+ </bean>
+
+ <!-- setup a local JMS Broker for testing purpose -->
+ <broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost" xmlns="http://activemq.apache.org/schema/core">
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61616"/>
+ </transportConnectors>
+ </broker>
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ </camelContext>
+
+</beans>