You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Johan Blomgren (JIRA)" <ji...@apache.org> on 2015/04/21 13:15:58 UTC

[jira] [Updated] (CAMEL-8680) Transaction REQUIRES_NEW does not work with multicast

     [ https://issues.apache.org/jira/browse/CAMEL-8680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Johan Blomgren updated CAMEL-8680:
----------------------------------
    Description: 
I do not get this to work, trying to multicast a transacted message.

Here are 2 tests, both behaving a bit differently, the first uses a xml context and the other uses plain java.

{code:title=JmsTransactionMulticast.java}
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

/**
 * Unit test will look for the spring .xml file with the same class name
 * but postfixed with -config.xml as filename.
 * <p/>
 * We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
 * that is a Spring class.
 */
@ContextConfiguration
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JmsTransactionMulticast extends AbstractJUnit4SpringContextTests {

    @Autowired
    private CamelContext camelContext;

    @EndpointInject(uri = "mock:result1")
    private MockEndpoint result1;

    @EndpointInject(uri = "mock:result2")
    private MockEndpoint result2;

    @EndpointInject(uri = "mock:dlq")
    private MockEndpoint dlq;

    @EndpointInject(uri = "direct:start")
    private ProducerTemplate start;

    @EndpointInject(uri = "direct:start2")
    private ProducerTemplate start2;

    @Before
    public void setUp() throws Exception {
        camelContext.addRoutes(new RouteBuilder() {

            @Override
            public void configure() throws Exception {

                onException(Exception.class).handled(true).to("mock:dlq").markRollbackOnlyLast();

                // Multicast route
                from("direct:start")
                    .transacted("PROPAGATION_REQUIRES_NEW")
                    .multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");

                // Route that does not multicast
                from("direct:start2")
                    .transacted("PROPAGATION_REQUIRES_NEW")
                    .to("direct:2")
                ;

                from("direct:1")
                    .choice()
                    .when(body().isEqualTo("a")).to("mock:result1")
                    .otherwise().throwException(new Exception("fail in |"));
                ;

                from("direct:2")
                    .choice()
                    .when(body().isEqualTo("b")).to("mock:result2")
                    .otherwise().throwException(new Exception("fail in 2"));
                ;

            }
        });
    }

    // this does work, it rollbacks properly
    @Test
    public void start2Send_rollback() throws Exception {
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start2.sendBody("a");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    //This will throw an exception in direct:2
    @Test
    public void start1Send_rollback() throws Exception {
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start.sendBody("a");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    //This will throw an exception in direct:1
    @Test
    public void testSend_rollback2() throws Exception {
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start.sendBody("b");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

}
{code}

{code:title=JmsTransactionMulticast-context.xml}
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:broker="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

  <!-- use required TX -->
  <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
  </bean>

  <!-- setup JMS connection factory -->
  <bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="8"/>
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
  </bean>

  <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
  </bean>

  <!-- setup spring jms TX manager -->
  <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="poolConnectionFactory"/>
  </bean>

  <!-- define our activemq component -->
  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory" ref="poolConnectionFactory"/>
    <!-- define the jms consumer/producer as transacted -->
    <property name="transacted" value="true"/>
    <!-- setup the transaction manager to use -->
    <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
         for instance use a JTA transaction manager then you must configure it -->
    <property name="transactionManager" ref="jmsTransactionManager"/>
  </bean>

  <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

</beans>
{code}

And here is another test that does not use xml configuration and behaves a bit differently
{code:title=JmsTransactionMulticast2.java}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;


public class JmsTransactionMulticast2 extends CamelTestSupport {

    private final String a1 = "wmq:queue:A1";
    private final String a2 = "wmq:queue:A2";
    private final String errorQueue = "wmq:queue:AERROR";
    private static final String PROPAGATION_REQUIRES_NEW = "PROPAGATION_REQUIRES_NEW";


    @EndpointInject(uri = "mock:result1")
    private MockEndpoint result1;

    @EndpointInject(uri = "mock:result2")
    private MockEndpoint result2;

    @EndpointInject(uri = "mock:dlq")
    private MockEndpoint dlq;

    @EndpointInject(uri = "direct:start")
    private ProducerTemplate start;

    @EndpointInject(uri = "direct:start2")
    private ProducerTemplate start2;

    public void addRoutes() throws Exception {
        context.addRoutes(new RouteBuilder() {

            @Override
            public void configure() throws Exception {

                onException(Exception.class).handled(true).to(errorQueue).markRollbackOnlyLast();

                from(a1).to("mock:result1");
                from(a2).to("mock:result2");
                from(errorQueue).to("mock:dlq");

                // Multicast route
                from("direct:start")
                    .transacted("PROPAGATION_REQUIRES_NEW")
                    .multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");

                // Route that does not multicast
                from("direct:start2")
                    .transacted("PROPAGATION_REQUIRES_NEW")
                    .to("direct:2")
                ;

                from("direct:1")
                    .choice()
                    .when(body().isEqualTo("a")).to(a1)
                    .otherwise().throwException(new Exception("fail in |"));
                ;

                from("direct:2")
                    .choice()
                    .when(body().isEqualTo("b")).to(a2)
                    .otherwise().throwException(new Exception("fail in 2"));
                ;
            }
        });
    }

    @Test
    public void start2Send_rollback() throws Exception {
        addRoutes();
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start2.sendBody("a");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    //This will throw an exception in direct:2
    @Test
    public void start1Send_rollback() throws Exception {
        addRoutes();
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start.sendBody("a");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    //This will throw an exception in direct:1
    @Test
    public void testSend_rollback2() throws Exception {
        addRoutes();
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start.sendBody("b");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    @Override
    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();
        addWmqComponent(camelContext);
        return camelContext;
    }

    final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
    final PlatformTransactionManager transactionManager = (PlatformTransactionManager) new JmsTransactionManager(connectionFactory);

    private void addWmqComponent(CamelContext camelContext) {
        final JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
        component.setConcurrentConsumers(1);
        camelContext.addComponent("wmq", component);
    }

    @Override
    protected JndiRegistry createRegistry() throws Exception {
        final JndiRegistry registry = super.createRegistry();
        registry.bind("jmsTransactionManager", transactionManager);
        registry.bind(PROPAGATION_REQUIRES_NEW, createTransactionPolicy(PROPAGATION_REQUIRES_NEW));
        return registry;
    }

    private SpringTransactionPolicy createTransactionPolicy(String propagation) {
        final SpringTransactionPolicy policy = new SpringTransactionPolicy();
        policy.setTransactionManager(transactionManager);
        policy.setPropagationBehaviorName(propagation);
        return policy;
    }

}
{code}

  was:
I did not manage to do a pull request but heres a test which in my opinion should work:

{code:title=JmsTransactionMulticast.java}
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

/**
 * Unit test will look for the spring .xml file with the same class name
 * but postfixed with -config.xml as filename.
 * <p/>
 * We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
 * that is a Spring class.
 */
@ContextConfiguration
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JmsTransactionMulticast extends AbstractJUnit4SpringContextTests {

    @Autowired
    private CamelContext camelContext;

    @EndpointInject(uri = "mock:result1")
    private MockEndpoint result1;

    @EndpointInject(uri = "mock:result2")
    private MockEndpoint result2;

    @EndpointInject(uri = "mock:dlq")
    private MockEndpoint dlq;

    @EndpointInject(uri = "direct:start")
    private ProducerTemplate start;

    @EndpointInject(uri = "direct:start2")
    private ProducerTemplate start2;

    @Before
    public void setUp() throws Exception {
        camelContext.addRoutes(new RouteBuilder() {

            @Override
            public void configure() throws Exception {

                onException(Exception.class).handled(true).to("mock:dlq").markRollbackOnlyLast();

                // Multicast route
                from("direct:start")
                    .transacted("PROPAGATION_REQUIRES_NEW")
                    .multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");

                // Route that does not multicast
                from("direct:start2")
                    .transacted("PROPAGATION_REQUIRES_NEW")
                    .to("direct:2")
                ;

                from("direct:1")
                    .choice()
                    .when(body().isEqualTo("a")).to("mock:result1")
                    .otherwise().throwException(new Exception("fail in |"));
                ;

                from("direct:2")
                    .choice()
                    .when(body().isEqualTo("b")).to("mock:result2")
                    .otherwise().throwException(new Exception("fail in 2"));
                ;

            }
        });
    }

    // this does work, it rollbacks properly
    @Test
    public void start2Send_rollback() throws Exception {
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start2.sendBody("a");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    //This will throw an exception in direct:2
    @Test
    public void start1Send_rollback() throws Exception {
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start.sendBody("a");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    //This will throw an exception in direct:1
    @Test
    public void testSend_rollback2() throws Exception {
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start.sendBody("b");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

}
{code}

{code:title=JmsTransactionMulticast-context.xml}
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:broker="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

  <!-- use required TX -->
  <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
  </bean>

  <!-- setup JMS connection factory -->
  <bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="8"/>
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
  </bean>

  <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
  </bean>

  <!-- setup spring jms TX manager -->
  <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="poolConnectionFactory"/>
  </bean>

  <!-- define our activemq component -->
  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory" ref="poolConnectionFactory"/>
    <!-- define the jms consumer/producer as transacted -->
    <property name="transacted" value="true"/>
    <!-- setup the transaction manager to use -->
    <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
         for instance use a JTA transaction manager then you must configure it -->
    <property name="transactionManager" ref="jmsTransactionManager"/>
  </bean>

  <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

</beans>
{code}

And here is another test that does not use xml configuration and behaves a bit differently
{code:title=JmsTransactionMulticast2.java}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;


public class JmsTransactionMulticast2 extends CamelTestSupport {

    private final String a1 = "wmq:queue:A1";
    private final String a2 = "wmq:queue:A2";
    private final String errorQueue = "wmq:queue:AERROR";
    private static final String PROPAGATION_REQUIRES_NEW = "PROPAGATION_REQUIRES_NEW";


    @EndpointInject(uri = "mock:result1")
    private MockEndpoint result1;

    @EndpointInject(uri = "mock:result2")
    private MockEndpoint result2;

    @EndpointInject(uri = "mock:dlq")
    private MockEndpoint dlq;

    @EndpointInject(uri = "direct:start")
    private ProducerTemplate start;

    @EndpointInject(uri = "direct:start2")
    private ProducerTemplate start2;

    public void addRoutes() throws Exception {
        context.addRoutes(new RouteBuilder() {

            @Override
            public void configure() throws Exception {

                onException(Exception.class).handled(true).to(errorQueue).markRollbackOnlyLast();

                from(a1).to("mock:result1");
                from(a2).to("mock:result2");
                from(errorQueue).to("mock:dlq");

                // Multicast route
                from("direct:start")
                    .transacted("PROPAGATION_REQUIRES_NEW")
                    .multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");

                // Route that does not multicast
                from("direct:start2")
                    .transacted("PROPAGATION_REQUIRES_NEW")
                    .to("direct:2")
                ;

                from("direct:1")
                    .choice()
                    .when(body().isEqualTo("a")).to(a1)
                    .otherwise().throwException(new Exception("fail in |"));
                ;

                from("direct:2")
                    .choice()
                    .when(body().isEqualTo("b")).to(a2)
                    .otherwise().throwException(new Exception("fail in 2"));
                ;
            }
        });
    }

    @Test
    public void start2Send_rollback() throws Exception {
        addRoutes();
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start2.sendBody("a");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    //This will throw an exception in direct:2
    @Test
    public void start1Send_rollback() throws Exception {
        addRoutes();
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start.sendBody("a");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    //This will throw an exception in direct:1
    @Test
    public void testSend_rollback2() throws Exception {
        addRoutes();
        result1.expectedMessageCount(0);
        result2.expectedMessageCount(0);
        dlq.expectedMessageCount(1);

        start.sendBody("b");

        result1.assertIsSatisfied();
        result2.assertIsSatisfied();
        dlq.assertIsSatisfied();
    }

    @Override
    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();
        addWmqComponent(camelContext);
        return camelContext;
    }

    final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
    final PlatformTransactionManager transactionManager = (PlatformTransactionManager) new JmsTransactionManager(connectionFactory);

    private void addWmqComponent(CamelContext camelContext) {
        final JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
        component.setConcurrentConsumers(1);
        camelContext.addComponent("wmq", component);
    }

    @Override
    protected JndiRegistry createRegistry() throws Exception {
        final JndiRegistry registry = super.createRegistry();
        registry.bind("jmsTransactionManager", transactionManager);
        registry.bind(PROPAGATION_REQUIRES_NEW, createTransactionPolicy(PROPAGATION_REQUIRES_NEW));
        return registry;
    }

    private SpringTransactionPolicy createTransactionPolicy(String propagation) {
        final SpringTransactionPolicy policy = new SpringTransactionPolicy();
        policy.setTransactionManager(transactionManager);
        policy.setPropagationBehaviorName(propagation);
        return policy;
    }

}
{code}


> Transaction REQUIRES_NEW does not work with multicast
> -----------------------------------------------------
>
>                 Key: CAMEL-8680
>                 URL: https://issues.apache.org/jira/browse/CAMEL-8680
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 2.15.1
>            Reporter: Johan Blomgren
>
> I do not get this to work, trying to multicast a transacted message.
> Here are 2 tests, both behaving a bit differently, the first uses a xml context and the other uses plain java.
> {code:title=JmsTransactionMulticast.java}
> import org.apache.camel.CamelContext;
> import org.apache.camel.EndpointInject;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.junit.Before;
> import org.junit.Test;
> import org.springframework.beans.factory.annotation.Autowired;
> import org.springframework.test.annotation.DirtiesContext;
> import org.springframework.test.context.ContextConfiguration;
> import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
> /**
>  * Unit test will look for the spring .xml file with the same class name
>  * but postfixed with -config.xml as filename.
>  * <p/>
>  * We use Spring Testing for unit test, eg we extend AbstractJUnit4SpringContextTests
>  * that is a Spring class.
>  */
> @ContextConfiguration
> @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
> public class JmsTransactionMulticast extends AbstractJUnit4SpringContextTests {
>     @Autowired
>     private CamelContext camelContext;
>     @EndpointInject(uri = "mock:result1")
>     private MockEndpoint result1;
>     @EndpointInject(uri = "mock:result2")
>     private MockEndpoint result2;
>     @EndpointInject(uri = "mock:dlq")
>     private MockEndpoint dlq;
>     @EndpointInject(uri = "direct:start")
>     private ProducerTemplate start;
>     @EndpointInject(uri = "direct:start2")
>     private ProducerTemplate start2;
>     @Before
>     public void setUp() throws Exception {
>         camelContext.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 onException(Exception.class).handled(true).to("mock:dlq").markRollbackOnlyLast();
>                 // Multicast route
>                 from("direct:start")
>                     .transacted("PROPAGATION_REQUIRES_NEW")
>                     .multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");
>                 // Route that does not multicast
>                 from("direct:start2")
>                     .transacted("PROPAGATION_REQUIRES_NEW")
>                     .to("direct:2")
>                 ;
>                 from("direct:1")
>                     .choice()
>                     .when(body().isEqualTo("a")).to("mock:result1")
>                     .otherwise().throwException(new Exception("fail in |"));
>                 ;
>                 from("direct:2")
>                     .choice()
>                     .when(body().isEqualTo("b")).to("mock:result2")
>                     .otherwise().throwException(new Exception("fail in 2"));
>                 ;
>             }
>         });
>     }
>     // this does work, it rollbacks properly
>     @Test
>     public void start2Send_rollback() throws Exception {
>         result1.expectedMessageCount(0);
>         result2.expectedMessageCount(0);
>         dlq.expectedMessageCount(1);
>         start2.sendBody("a");
>         result1.assertIsSatisfied();
>         result2.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>     }
>     //This will throw an exception in direct:2
>     @Test
>     public void start1Send_rollback() throws Exception {
>         result1.expectedMessageCount(0);
>         result2.expectedMessageCount(0);
>         dlq.expectedMessageCount(1);
>         start.sendBody("a");
>         result1.assertIsSatisfied();
>         result2.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>     }
>     //This will throw an exception in direct:1
>     @Test
>     public void testSend_rollback2() throws Exception {
>         result1.expectedMessageCount(0);
>         result2.expectedMessageCount(0);
>         dlq.expectedMessageCount(1);
>         start.sendBody("b");
>         result1.assertIsSatisfied();
>         result2.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>     }
> }
> {code}
> {code:title=JmsTransactionMulticast-context.xml}
> <beans xmlns="http://www.springframework.org/schema/beans"
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>        xmlns:broker="http://activemq.apache.org/schema/core"
>        xsi:schemaLocation="
>        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
>        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
>        http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
>   <!-- use required TX -->
>   <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
>     <property name="transactionManager" ref="jmsTransactionManager"/>
>     <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
>   </bean>
>   <!-- setup JMS connection factory -->
>   <bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
>     <property name="maxConnections" value="8"/>
>     <property name="connectionFactory" ref="jmsConnectionFactory"/>
>   </bean>
>   <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
>     <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
>   </bean>
>   <!-- setup spring jms TX manager -->
>   <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
>     <property name="connectionFactory" ref="poolConnectionFactory"/>
>   </bean>
>   <!-- define our activemq component -->
>   <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
>     <property name="connectionFactory" ref="poolConnectionFactory"/>
>     <!-- define the jms consumer/producer as transacted -->
>     <property name="transacted" value="true"/>
>     <!-- setup the transaction manager to use -->
>     <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
>          for instance use a JTA transaction manager then you must configure it -->
>     <property name="transactionManager" ref="jmsTransactionManager"/>
>   </bean>
>   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
>   </camelContext>
> </beans>
> {code}
> And here is another test that does not use xml configuration and behaves a bit differently
> {code:title=JmsTransactionMulticast2.java}
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.camel.CamelContext;
> import org.apache.camel.EndpointInject;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.jms.JmsComponent;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.JndiRegistry;
> import org.apache.camel.spring.spi.SpringTransactionPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
> import org.springframework.jms.connection.JmsTransactionManager;
> import org.springframework.transaction.PlatformTransactionManager;
> public class JmsTransactionMulticast2 extends CamelTestSupport {
>     private final String a1 = "wmq:queue:A1";
>     private final String a2 = "wmq:queue:A2";
>     private final String errorQueue = "wmq:queue:AERROR";
>     private static final String PROPAGATION_REQUIRES_NEW = "PROPAGATION_REQUIRES_NEW";
>     @EndpointInject(uri = "mock:result1")
>     private MockEndpoint result1;
>     @EndpointInject(uri = "mock:result2")
>     private MockEndpoint result2;
>     @EndpointInject(uri = "mock:dlq")
>     private MockEndpoint dlq;
>     @EndpointInject(uri = "direct:start")
>     private ProducerTemplate start;
>     @EndpointInject(uri = "direct:start2")
>     private ProducerTemplate start2;
>     public void addRoutes() throws Exception {
>         context.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 onException(Exception.class).handled(true).to(errorQueue).markRollbackOnlyLast();
>                 from(a1).to("mock:result1");
>                 from(a2).to("mock:result2");
>                 from(errorQueue).to("mock:dlq");
>                 // Multicast route
>                 from("direct:start")
>                     .transacted("PROPAGATION_REQUIRES_NEW")
>                     .multicast().shareUnitOfWork().stopOnException().to("direct:1", "direct:2");
>                 // Route that does not multicast
>                 from("direct:start2")
>                     .transacted("PROPAGATION_REQUIRES_NEW")
>                     .to("direct:2")
>                 ;
>                 from("direct:1")
>                     .choice()
>                     .when(body().isEqualTo("a")).to(a1)
>                     .otherwise().throwException(new Exception("fail in |"));
>                 ;
>                 from("direct:2")
>                     .choice()
>                     .when(body().isEqualTo("b")).to(a2)
>                     .otherwise().throwException(new Exception("fail in 2"));
>                 ;
>             }
>         });
>     }
>     @Test
>     public void start2Send_rollback() throws Exception {
>         addRoutes();
>         result1.expectedMessageCount(0);
>         result2.expectedMessageCount(0);
>         dlq.expectedMessageCount(1);
>         start2.sendBody("a");
>         result1.assertIsSatisfied();
>         result2.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>     }
>     //This will throw an exception in direct:2
>     @Test
>     public void start1Send_rollback() throws Exception {
>         addRoutes();
>         result1.expectedMessageCount(0);
>         result2.expectedMessageCount(0);
>         dlq.expectedMessageCount(1);
>         start.sendBody("a");
>         result1.assertIsSatisfied();
>         result2.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>     }
>     //This will throw an exception in direct:1
>     @Test
>     public void testSend_rollback2() throws Exception {
>         addRoutes();
>         result1.expectedMessageCount(0);
>         result2.expectedMessageCount(0);
>         dlq.expectedMessageCount(1);
>         start.sendBody("b");
>         result1.assertIsSatisfied();
>         result2.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>     }
>     @Override
>     protected CamelContext createCamelContext() throws Exception {
>         CamelContext camelContext = super.createCamelContext();
>         addWmqComponent(camelContext);
>         return camelContext;
>     }
>     final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
>     final PlatformTransactionManager transactionManager = (PlatformTransactionManager) new JmsTransactionManager(connectionFactory);
>     private void addWmqComponent(CamelContext camelContext) {
>         final JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
>         component.setConcurrentConsumers(1);
>         camelContext.addComponent("wmq", component);
>     }
>     @Override
>     protected JndiRegistry createRegistry() throws Exception {
>         final JndiRegistry registry = super.createRegistry();
>         registry.bind("jmsTransactionManager", transactionManager);
>         registry.bind(PROPAGATION_REQUIRES_NEW, createTransactionPolicy(PROPAGATION_REQUIRES_NEW));
>         return registry;
>     }
>     private SpringTransactionPolicy createTransactionPolicy(String propagation) {
>         final SpringTransactionPolicy policy = new SpringTransactionPolicy();
>         policy.setTransactionManager(transactionManager);
>         policy.setPropagationBehaviorName(propagation);
>         return policy;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)