You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Johan Blomgren (JIRA)" <ji...@apache.org> on 2015/04/21 13:15:58 UTC
[jira] [Updated] (CAMEL-8680) Transaction REQUIRES_NEW does not
work with multicast
[ https://issues.apache.org/jira/browse/CAMEL-8680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Johan Blomgren updated CAMEL-8680:
----------------------------------
Description:
I do not get this to work, trying to multicast a transacted message.
Here are 2 tests, both behaving a bit differently, the first uses a xml context and the other uses plain java.
{code:title=JmsTransactionMulticast.java}
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
/**
* Unit test will look for the spring .xml file with the same class name
* but postfixed with -config.xml as filename.
* <p/>
* We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
* that is a Spring class.
*/
@ContextConfiguration
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JmsTransactionMulticast extends AbstractJUnit4SpringContextTests {
@Autowired
private CamelContext camelContext;
@EndpointInject(uri = "mock:result1")
private MockEndpoint result1;
@EndpointInject(uri = "mock:result2")
private MockEndpoint result2;
@EndpointInject(uri = "mock:dlq")
private MockEndpoint dlq;
@EndpointInject(uri = "direct:start")
private ProducerTemplate start;
@EndpointInject(uri = "direct:start2")
private ProducerTemplate start2;
@Before
public void setUp() throws Exception {
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class).handled(true).to("mock:dlq").markRollbackOnlyLast();
// Multicast route
from("direct:start")
.transacted("PROPAGATION_REQUIRES_NEW")
.multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");
// Route that does not multicast
from("direct:start2")
.transacted("PROPAGATION_REQUIRES_NEW")
.to("direct:2")
;
from("direct:1")
.choice()
.when(body().isEqualTo("a")).to("mock:result1")
.otherwise().throwException(new Exception("fail in |"));
;
from("direct:2")
.choice()
.when(body().isEqualTo("b")).to("mock:result2")
.otherwise().throwException(new Exception("fail in 2"));
;
}
});
}
// this does work, it rollbacks properly
@Test
public void start2Send_rollback() throws Exception {
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start2.sendBody("a");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
//This will throw an exception in direct:2
@Test
public void start1Send_rollback() throws Exception {
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start.sendBody("a");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
//This will throw an exception in direct:1
@Test
public void testSend_rollback2() throws Exception {
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start.sendBody("b");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
}
{code}
{code:title=JmsTransactionMulticast-context.xml}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- use required TX -->
<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
</bean>
<!-- setup JMS connection factory -->
<bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="8"/>
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
</bean>
<!-- setup spring jms TX manager -->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="poolConnectionFactory"/>
</bean>
<!-- define our activemq component -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory" ref="poolConnectionFactory"/>
<!-- define the jms consumer/producer as transacted -->
<property name="transacted" value="true"/>
<!-- setup the transaction manager to use -->
<!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
for instance use a JTA transaction manager then you must configure it -->
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
</camelContext>
</beans>
{code}
And here is another test that does not use xml configuration and behaves a bit differently
{code:title=JmsTransactionMulticast2.java}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
public class JmsTransactionMulticast2 extends CamelTestSupport {
private final String a1 = "wmq:queue:A1";
private final String a2 = "wmq:queue:A2";
private final String errorQueue = "wmq:queue:AERROR";
private static final String PROPAGATION_REQUIRES_NEW = "PROPAGATION_REQUIRES_NEW";
@EndpointInject(uri = "mock:result1")
private MockEndpoint result1;
@EndpointInject(uri = "mock:result2")
private MockEndpoint result2;
@EndpointInject(uri = "mock:dlq")
private MockEndpoint dlq;
@EndpointInject(uri = "direct:start")
private ProducerTemplate start;
@EndpointInject(uri = "direct:start2")
private ProducerTemplate start2;
public void addRoutes() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class).handled(true).to(errorQueue).markRollbackOnlyLast();
from(a1).to("mock:result1");
from(a2).to("mock:result2");
from(errorQueue).to("mock:dlq");
// Multicast route
from("direct:start")
.transacted("PROPAGATION_REQUIRES_NEW")
.multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");
// Route that does not multicast
from("direct:start2")
.transacted("PROPAGATION_REQUIRES_NEW")
.to("direct:2")
;
from("direct:1")
.choice()
.when(body().isEqualTo("a")).to(a1)
.otherwise().throwException(new Exception("fail in |"));
;
from("direct:2")
.choice()
.when(body().isEqualTo("b")).to(a2)
.otherwise().throwException(new Exception("fail in 2"));
;
}
});
}
@Test
public void start2Send_rollback() throws Exception {
addRoutes();
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start2.sendBody("a");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
//This will throw an exception in direct:2
@Test
public void start1Send_rollback() throws Exception {
addRoutes();
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start.sendBody("a");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
//This will throw an exception in direct:1
@Test
public void testSend_rollback2() throws Exception {
addRoutes();
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start.sendBody("b");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
addWmqComponent(camelContext);
return camelContext;
}
final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final PlatformTransactionManager transactionManager = (PlatformTransactionManager) new JmsTransactionManager(connectionFactory);
private void addWmqComponent(CamelContext camelContext) {
final JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
component.setConcurrentConsumers(1);
camelContext.addComponent("wmq", component);
}
@Override
protected JndiRegistry createRegistry() throws Exception {
final JndiRegistry registry = super.createRegistry();
registry.bind("jmsTransactionManager", transactionManager);
registry.bind(PROPAGATION_REQUIRES_NEW, createTransactionPolicy(PROPAGATION_REQUIRES_NEW));
return registry;
}
private SpringTransactionPolicy createTransactionPolicy(String propagation) {
final SpringTransactionPolicy policy = new SpringTransactionPolicy();
policy.setTransactionManager(transactionManager);
policy.setPropagationBehaviorName(propagation);
return policy;
}
}
{code}
was:
I did not manage to do a pull request but heres a test which in my opinion should work:
{code:title=JmsTransactionMulticast.java}
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
/**
* Unit test will look for the spring .xml file with the same class name
* but postfixed with -config.xml as filename.
* <p/>
* We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
* that is a Spring class.
*/
@ContextConfiguration
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JmsTransactionMulticast extends AbstractJUnit4SpringContextTests {
@Autowired
private CamelContext camelContext;
@EndpointInject(uri = "mock:result1")
private MockEndpoint result1;
@EndpointInject(uri = "mock:result2")
private MockEndpoint result2;
@EndpointInject(uri = "mock:dlq")
private MockEndpoint dlq;
@EndpointInject(uri = "direct:start")
private ProducerTemplate start;
@EndpointInject(uri = "direct:start2")
private ProducerTemplate start2;
@Before
public void setUp() throws Exception {
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class).handled(true).to("mock:dlq").markRollbackOnlyLast();
// Multicast route
from("direct:start")
.transacted("PROPAGATION_REQUIRES_NEW")
.multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");
// Route that does not multicast
from("direct:start2")
.transacted("PROPAGATION_REQUIRES_NEW")
.to("direct:2")
;
from("direct:1")
.choice()
.when(body().isEqualTo("a")).to("mock:result1")
.otherwise().throwException(new Exception("fail in |"));
;
from("direct:2")
.choice()
.when(body().isEqualTo("b")).to("mock:result2")
.otherwise().throwException(new Exception("fail in 2"));
;
}
});
}
// this does work, it rollbacks properly
@Test
public void start2Send_rollback() throws Exception {
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start2.sendBody("a");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
//This will throw an exception in direct:2
@Test
public void start1Send_rollback() throws Exception {
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start.sendBody("a");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
//This will throw an exception in direct:1
@Test
public void testSend_rollback2() throws Exception {
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start.sendBody("b");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
}
{code}
{code:title=JmsTransactionMulticast-context.xml}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- use required TX -->
<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
</bean>
<!-- setup JMS connection factory -->
<bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="8"/>
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
</bean>
<!-- setup spring jms TX manager -->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="poolConnectionFactory"/>
</bean>
<!-- define our activemq component -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory" ref="poolConnectionFactory"/>
<!-- define the jms consumer/producer as transacted -->
<property name="transacted" value="true"/>
<!-- setup the transaction manager to use -->
<!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
for instance use a JTA transaction manager then you must configure it -->
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
</camelContext>
</beans>
{code}
And here is another test that does not use xml configuration and behaves a bit differently
{code:title=JmsTransactionMulticast2.java}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
public class JmsTransactionMulticast2 extends CamelTestSupport {
private final String a1 = "wmq:queue:A1";
private final String a2 = "wmq:queue:A2";
private final String errorQueue = "wmq:queue:AERROR";
private static final String PROPAGATION_REQUIRES_NEW = "PROPAGATION_REQUIRES_NEW";
@EndpointInject(uri = "mock:result1")
private MockEndpoint result1;
@EndpointInject(uri = "mock:result2")
private MockEndpoint result2;
@EndpointInject(uri = "mock:dlq")
private MockEndpoint dlq;
@EndpointInject(uri = "direct:start")
private ProducerTemplate start;
@EndpointInject(uri = "direct:start2")
private ProducerTemplate start2;
public void addRoutes() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class).handled(true).to(errorQueue).markRollbackOnlyLast();
from(a1).to("mock:result1");
from(a2).to("mock:result2");
from(errorQueue).to("mock:dlq");
// Multicast route
from("direct:start")
.transacted("PROPAGATION_REQUIRES_NEW")
.multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");
// Route that does not multicast
from("direct:start2")
.transacted("PROPAGATION_REQUIRES_NEW")
.to("direct:2")
;
from("direct:1")
.choice()
.when(body().isEqualTo("a")).to(a1)
.otherwise().throwException(new Exception("fail in |"));
;
from("direct:2")
.choice()
.when(body().isEqualTo("b")).to(a2)
.otherwise().throwException(new Exception("fail in 2"));
;
}
});
}
@Test
public void start2Send_rollback() throws Exception {
addRoutes();
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start2.sendBody("a");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
//This will throw an exception in direct:2
@Test
public void start1Send_rollback() throws Exception {
addRoutes();
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start.sendBody("a");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
//This will throw an exception in direct:1
@Test
public void testSend_rollback2() throws Exception {
addRoutes();
result1.expectedMessageCount(0);
result2.expectedMessageCount(0);
dlq.expectedMessageCount(1);
start.sendBody("b");
result1.assertIsSatisfied();
result2.assertIsSatisfied();
dlq.assertIsSatisfied();
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
addWmqComponent(camelContext);
return camelContext;
}
final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final PlatformTransactionManager transactionManager = (PlatformTransactionManager) new JmsTransactionManager(connectionFactory);
private void addWmqComponent(CamelContext camelContext) {
final JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
component.setConcurrentConsumers(1);
camelContext.addComponent("wmq", component);
}
@Override
protected JndiRegistry createRegistry() throws Exception {
final JndiRegistry registry = super.createRegistry();
registry.bind("jmsTransactionManager", transactionManager);
registry.bind(PROPAGATION_REQUIRES_NEW, createTransactionPolicy(PROPAGATION_REQUIRES_NEW));
return registry;
}
private SpringTransactionPolicy createTransactionPolicy(String propagation) {
final SpringTransactionPolicy policy = new SpringTransactionPolicy();
policy.setTransactionManager(transactionManager);
policy.setPropagationBehaviorName(propagation);
return policy;
}
}
{code}
> Transaction REQUIRES_NEW does not work with multicast
> -----------------------------------------------------
>
> Key: CAMEL-8680
> URL: https://issues.apache.org/jira/browse/CAMEL-8680
> Project: Camel
> Issue Type: Bug
> Affects Versions: 2.15.1
> Reporter: Johan Blomgren
>
> I do not get this to work, trying to multicast a transacted message.
> Here are 2 tests, both behaving a bit differently, the first uses a xml context and the other uses plain java.
> {code:title=JmsTransactionMulticast.java}
> import org.apache.camel.CamelContext;
> import org.apache.camel.EndpointInject;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.junit.Before;
> import org.junit.Test;
> import org.springframework.beans.factory.annotation.Autowired;
> import org.springframework.test.annotation.DirtiesContext;
> import org.springframework.test.context.ContextConfiguration;
> import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
> /**
> * Unit test will look for the spring .xml file with the same class name
> * but postfixed with -config.xml as filename.
> * <p/>
> * We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
> * that is a Spring class.
> */
> @ContextConfiguration
> @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
> public class JmsTransactionMulticast extends AbstractJUnit4SpringContextTests {
> @Autowired
> private CamelContext camelContext;
> @EndpointInject(uri = "mock:result1")
> private MockEndpoint result1;
> @EndpointInject(uri = "mock:result2")
> private MockEndpoint result2;
> @EndpointInject(uri = "mock:dlq")
> private MockEndpoint dlq;
> @EndpointInject(uri = "direct:start")
> private ProducerTemplate start;
> @EndpointInject(uri = "direct:start2")
> private ProducerTemplate start2;
> @Before
> public void setUp() throws Exception {
> camelContext.addRoutes(new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> onException(Exception.class).handled(true).to("mock:dlq").markRollbackOnlyLast();
> // Multicast route
> from("direct:start")
> .transacted("PROPAGATION_REQUIRES_NEW")
> .multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");
> // Route that does not multicast
> from("direct:start2")
> .transacted("PROPAGATION_REQUIRES_NEW")
> .to("direct:2")
> ;
> from("direct:1")
> .choice()
> .when(body().isEqualTo("a")).to("mock:result1")
> .otherwise().throwException(new Exception("fail in |"));
> ;
> from("direct:2")
> .choice()
> .when(body().isEqualTo("b")).to("mock:result2")
> .otherwise().throwException(new Exception("fail in 2"));
> ;
> }
> });
> }
> // this does work, it rollbacks properly
> @Test
> public void start2Send_rollback() throws Exception {
> result1.expectedMessageCount(0);
> result2.expectedMessageCount(0);
> dlq.expectedMessageCount(1);
> start2.sendBody("a");
> result1.assertIsSatisfied();
> result2.assertIsSatisfied();
> dlq.assertIsSatisfied();
> }
> //This will throw an exception in direct:2
> @Test
> public void start1Send_rollback() throws Exception {
> result1.expectedMessageCount(0);
> result2.expectedMessageCount(0);
> dlq.expectedMessageCount(1);
> start.sendBody("a");
> result1.assertIsSatisfied();
> result2.assertIsSatisfied();
> dlq.assertIsSatisfied();
> }
> //This will throw an exception in direct:1
> @Test
> public void testSend_rollback2() throws Exception {
> result1.expectedMessageCount(0);
> result2.expectedMessageCount(0);
> dlq.expectedMessageCount(1);
> start.sendBody("b");
> result1.assertIsSatisfied();
> result2.assertIsSatisfied();
> dlq.assertIsSatisfied();
> }
> }
> {code}
> {code:title=JmsTransactionMulticast-context.xml}
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xmlns:broker="http://activemq.apache.org/schema/core"
> xsi:schemaLocation="
> http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
> http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
> http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
> <!-- use required TX -->
> <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
> <property name="transactionManager" ref="jmsTransactionManager"/>
> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
> </bean>
> <!-- setup JMS connection factory -->
> <bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
> <property name="maxConnections" value="8"/>
> <property name="connectionFactory" ref="jmsConnectionFactory"/>
> </bean>
> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
> <property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
> </bean>
> <!-- setup spring jms TX manager -->
> <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
> <property name="connectionFactory" ref="poolConnectionFactory"/>
> </bean>
> <!-- define our activemq component -->
> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
> <property name="connectionFactory" ref="poolConnectionFactory"/>
> <!-- define the jms consumer/producer as transacted -->
> <property name="transacted" value="true"/>
> <!-- setup the transaction manager to use -->
> <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
> for instance use a JTA transaction manager then you must configure it -->
> <property name="transactionManager" ref="jmsTransactionManager"/>
> </bean>
> <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
> </camelContext>
> </beans>
> {code}
> And here is another test that does not use xml configuration and behaves a bit differently
> {code:title=JmsTransactionMulticast2.java}
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.camel.CamelContext;
> import org.apache.camel.EndpointInject;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.jms.JmsComponent;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.JndiRegistry;
> import org.apache.camel.spring.spi.SpringTransactionPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
> import org.springframework.jms.connection.JmsTransactionManager;
> import org.springframework.transaction.PlatformTransactionManager;
> public class JmsTransactionMulticast2 extends CamelTestSupport {
> private final String a1 = "wmq:queue:A1";
> private final String a2 = "wmq:queue:A2";
> private final String errorQueue = "wmq:queue:AERROR";
> private static final String PROPAGATION_REQUIRES_NEW = "PROPAGATION_REQUIRES_NEW";
> @EndpointInject(uri = "mock:result1")
> private MockEndpoint result1;
> @EndpointInject(uri = "mock:result2")
> private MockEndpoint result2;
> @EndpointInject(uri = "mock:dlq")
> private MockEndpoint dlq;
> @EndpointInject(uri = "direct:start")
> private ProducerTemplate start;
> @EndpointInject(uri = "direct:start2")
> private ProducerTemplate start2;
> public void addRoutes() throws Exception {
> context.addRoutes(new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> onException(Exception.class).handled(true).to(errorQueue).markRollbackOnlyLast();
> from(a1).to("mock:result1");
> from(a2).to("mock:result2");
> from(errorQueue).to("mock:dlq");
> // Multicast route
> from("direct:start")
> .transacted("PROPAGATION_REQUIRES_NEW")
> .multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");
> // Route that does not multicast
> from("direct:start2")
> .transacted("PROPAGATION_REQUIRES_NEW")
> .to("direct:2")
> ;
> from("direct:1")
> .choice()
> .when(body().isEqualTo("a")).to(a1)
> .otherwise().throwException(new Exception("fail in |"));
> ;
> from("direct:2")
> .choice()
> .when(body().isEqualTo("b")).to(a2)
> .otherwise().throwException(new Exception("fail in 2"));
> ;
> }
> });
> }
> @Test
> public void start2Send_rollback() throws Exception {
> addRoutes();
> result1.expectedMessageCount(0);
> result2.expectedMessageCount(0);
> dlq.expectedMessageCount(1);
> start2.sendBody("a");
> result1.assertIsSatisfied();
> result2.assertIsSatisfied();
> dlq.assertIsSatisfied();
> }
> //This will throw an exception in direct:2
> @Test
> public void start1Send_rollback() throws Exception {
> addRoutes();
> result1.expectedMessageCount(0);
> result2.expectedMessageCount(0);
> dlq.expectedMessageCount(1);
> start.sendBody("a");
> result1.assertIsSatisfied();
> result2.assertIsSatisfied();
> dlq.assertIsSatisfied();
> }
> //This will throw an exception in direct:1
> @Test
> public void testSend_rollback2() throws Exception {
> addRoutes();
> result1.expectedMessageCount(0);
> result2.expectedMessageCount(0);
> dlq.expectedMessageCount(1);
> start.sendBody("b");
> result1.assertIsSatisfied();
> result2.assertIsSatisfied();
> dlq.assertIsSatisfied();
> }
> @Override
> protected CamelContext createCamelContext() throws Exception {
> CamelContext camelContext = super.createCamelContext();
> addWmqComponent(camelContext);
> return camelContext;
> }
> final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
> final PlatformTransactionManager transactionManager = (PlatformTransactionManager) new JmsTransactionManager(connectionFactory);
> private void addWmqComponent(CamelContext camelContext) {
> final JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
> component.setConcurrentConsumers(1);
> camelContext.addComponent("wmq", component);
> }
> @Override
> protected JndiRegistry createRegistry() throws Exception {
> final JndiRegistry registry = super.createRegistry();
> registry.bind("jmsTransactionManager", transactionManager);
> registry.bind(PROPAGATION_REQUIRES_NEW, createTransactionPolicy(PROPAGATION_REQUIRES_NEW));
> return registry;
> }
> private SpringTransactionPolicy createTransactionPolicy(String propagation) {
> final SpringTransactionPolicy policy = new SpringTransactionPolicy();
> policy.setTransactionManager(transactionManager);
> policy.setPropagationBehaviorName(propagation);
> return policy;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)