You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Krzysztof Burghardt <kr...@burghardt.pl> on 2010/03/05 16:46:20 UTC

Problem with transactional message forwarding between two ActiveMQ brokers

Dear Users,

I have configured Camel (v2.0) to forward messages from one broker to
another. Now I'm looking for solution for messages disappearing from
queue (consumed by Camel) while destination broker is down. I prepared
config with transaction manager (below), but it did not work as I
expect:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
       http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
       http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
    ">

    <bean id="jmsConnectionFactory1"
class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <bean id="jmsConnectionFactory2"
class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61717"/>
    </bean>

    <bean id="jmsTransactionManager1"
class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsConnectionFactory1"/>
    </bean>

    <bean id="jmsConfig1"
class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="jmsConnectionFactory1"/>
        <property name="transactionManager" ref="jmsTransactionManager1"/>
        <property name="transacted" value="true"/>
        <property name="transactedInOut" value="true"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

    <bean id="jmsTransactionManager2"
class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsConnectionFactory2"/>
    </bean>

    <bean id="jmsConfig2"
class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="jmsConnectionFactory2"/>
        <property name="transactionManager" ref="jmsTransactionManager2"/>
        <property name="transacted" value="true"/>
        <property name="transactedInOut" value="true"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

    <bean id="activemq1" class="org.apache.camel.component.jms.JmsComponent">
        <property name="configuration" ref="jmsConfig1"/>
    </bean>

    <bean id="activemq2" class="org.apache.camel.component.jms.JmsComponent">
        <property name="configuration" ref="jmsConfig2"/>
    </bean>

    <bean id="PROPAGATION_REQUIRED_POLICY_1"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
        <constructor-arg>
            <bean
class="org.springframework.transaction.support.TransactionTemplate">
                <property name="transactionManager"
ref="jmsTransactionManager1"/>
            </bean>
        </constructor-arg>
    </bean>

    <bean id="PROPAGATION_REQUIRED_POLICY_2"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
        <constructor-arg>
            <bean
class="org.springframework.transaction.support.TransactionTemplate">
                <property name="transactionManager"
ref="jmsTransactionManager2"/>
            </bean>
        </constructor-arg>
    </bean>

    <camelContext id="camel1" xmlns="http://camel.apache.org/schema/spring">
        <route id="camel1-route1">
            <from uri="activemq1:queue:in1?transacted=true"/>
            <transacted/>
            <policy ref="PROPAGATION_REQUIRED_POLICY_1"/>
            <doTry>
                <to uri="activemq2:queue:out1?transacted=true"/>
                <doCatch>
                    <exception>java.lang.Exception</exception>
                    <rollback/>
                </doCatch>
            </doTry>
        </route>
        <route id="camel1-route2">
            <from uri="activemq2:queue:in2?transacted=true"/>
            <transacted/>
            <policy ref="PROPAGATION_REQUIRED_POLICY_2"/>
            <doTry>
                <to uri="activemq1:queue:out2?transacted=true"/>
                <doCatch>
                    <exception>java.lang.Exception</exception>
                    <rollback/>
                </doCatch>
            </doTry>
        </route>
    </camelContext>

</beans>

Messages from input queue are consumed even when destination queue is
unavailable (broker down). In Camel log I found some errors like this
(one for each input message):

ERROR | org.apache.camel.RollbackExchangeException: Intended rollback
on the exchange: Exchange[Message: Enter some text here for the
message body...]
 WARN | Execution of JMS message listener failed
org.apache.camel.spring.spi.TransactedRuntimeCamelException:
org.apache.camel.RollbackExchangeException: Intended rollback on the
exchange: Exchange[Message: Enter some text here for the message
body...]
        at org.apache.camel.spring.spi.TransactionErrorHandler.wrapTransactedRuntimeException(TransactionErrorHandler.java:171)
        at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:121)

Any suggestions welcome.

Best regards,
-- 
Krzysztof Burghardt <kr...@burghardt.pl>
http://www.burghardt.pl/

Re: Problem with transactional message forwarding between two ActiveMQ brokers

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

First I suggest to use the latest release, Camel 2.2.

Then you should not use doTry, doCatch etc as the TX stuff can work
out of the box.

Since you define your own policy, then do not use <transacted/>.

If you use Camel 2.2 then you can define it as
<transacted ref="MY_PROPAGATION_POLICY"/>

I dont think that ref attribute exists in 2.0, but you can take a look.


On Fri, Mar 5, 2010 at 4:46 PM, Krzysztof Burghardt
<kr...@burghardt.pl> wrote:
> Dear Users,
>
> I have configured Camel (v2.0) to forward messages from one broker to
> another. Now I'm looking for solution for messages disappearing from
> queue (consumed by Camel) while destination broker is down. I prepared
> config with transaction manager (below), but it did not work as I
> expect:
>
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
>       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>       xmlns:context="http://www.springframework.org/schema/context"
>       xsi:schemaLocation="
>       http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
>       http://www.springframework.org/schema/context
> http://www.springframework.org/schema/context/spring-context-2.5.xsd
>       http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd
>       http://camel.apache.org/schema/spring
> http://camel.apache.org/schema/spring/camel-spring.xsd
>    ">
>
>    <bean id="jmsConnectionFactory1"
> class="org.apache.activemq.ActiveMQConnectionFactory">
>        <property name="brokerURL" value="tcp://localhost:61616"/>
>    </bean>
>
>    <bean id="jmsConnectionFactory2"
> class="org.apache.activemq.ActiveMQConnectionFactory">
>        <property name="brokerURL" value="tcp://localhost:61717"/>
>    </bean>
>
>    <bean id="jmsTransactionManager1"
> class="org.springframework.jms.connection.JmsTransactionManager">
>        <property name="connectionFactory" ref="jmsConnectionFactory1"/>
>    </bean>
>
>    <bean id="jmsConfig1"
> class="org.apache.camel.component.jms.JmsConfiguration">
>        <property name="connectionFactory" ref="jmsConnectionFactory1"/>
>        <property name="transactionManager" ref="jmsTransactionManager1"/>
>        <property name="transacted" value="true"/>
>        <property name="transactedInOut" value="true"/>
>        <property name="concurrentConsumers" value="1"/>
>    </bean>
>
>    <bean id="jmsTransactionManager2"
> class="org.springframework.jms.connection.JmsTransactionManager">
>        <property name="connectionFactory" ref="jmsConnectionFactory2"/>
>    </bean>
>
>    <bean id="jmsConfig2"
> class="org.apache.camel.component.jms.JmsConfiguration">
>        <property name="connectionFactory" ref="jmsConnectionFactory2"/>
>        <property name="transactionManager" ref="jmsTransactionManager2"/>
>        <property name="transacted" value="true"/>
>        <property name="transactedInOut" value="true"/>
>        <property name="concurrentConsumers" value="1"/>
>    </bean>
>
>    <bean id="activemq1" class="org.apache.camel.component.jms.JmsComponent">
>        <property name="configuration" ref="jmsConfig1"/>
>    </bean>
>
>    <bean id="activemq2" class="org.apache.camel.component.jms.JmsComponent">
>        <property name="configuration" ref="jmsConfig2"/>
>    </bean>
>
>    <bean id="PROPAGATION_REQUIRED_POLICY_1"
> class="org.apache.camel.spring.spi.SpringTransactionPolicy">
>        <constructor-arg>
>            <bean
> class="org.springframework.transaction.support.TransactionTemplate">
>                <property name="transactionManager"
> ref="jmsTransactionManager1"/>
>            </bean>
>        </constructor-arg>
>    </bean>
>
>    <bean id="PROPAGATION_REQUIRED_POLICY_2"
> class="org.apache.camel.spring.spi.SpringTransactionPolicy">
>        <constructor-arg>
>            <bean
> class="org.springframework.transaction.support.TransactionTemplate">
>                <property name="transactionManager"
> ref="jmsTransactionManager2"/>
>            </bean>
>        </constructor-arg>
>    </bean>
>
>    <camelContext id="camel1" xmlns="http://camel.apache.org/schema/spring">
>        <route id="camel1-route1">
>            <from uri="activemq1:queue:in1?transacted=true"/>
>            <transacted/>
>            <policy ref="PROPAGATION_REQUIRED_POLICY_1"/>
>            <doTry>
>                <to uri="activemq2:queue:out1?transacted=true"/>
>                <doCatch>
>                    <exception>java.lang.Exception</exception>
>                    <rollback/>
>                </doCatch>
>            </doTry>
>        </route>
>        <route id="camel1-route2">
>            <from uri="activemq2:queue:in2?transacted=true"/>
>            <transacted/>
>            <policy ref="PROPAGATION_REQUIRED_POLICY_2"/>
>            <doTry>
>                <to uri="activemq1:queue:out2?transacted=true"/>
>                <doCatch>
>                    <exception>java.lang.Exception</exception>
>                    <rollback/>
>                </doCatch>
>            </doTry>
>        </route>
>    </camelContext>
>
> </beans>
>
> Messages from input queue are consumed even when destination queue is
> unavailable (broker down). In Camel log I found some errors like this
> (one for each input message):
>
> ERROR | org.apache.camel.RollbackExchangeException: Intended rollback
> on the exchange: Exchange[Message: Enter some text here for the
> message body...]
>  WARN | Execution of JMS message listener failed
> org.apache.camel.spring.spi.TransactedRuntimeCamelException:
> org.apache.camel.RollbackExchangeException: Intended rollback on the
> exchange: Exchange[Message: Enter some text here for the message
> body...]
>        at org.apache.camel.spring.spi.TransactionErrorHandler.wrapTransactedRuntimeException(TransactionErrorHandler.java:171)
>        at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:121)
>
> Any suggestions welcome.
>
> Best regards,
> --
> Krzysztof Burghardt <kr...@burghardt.pl>
> http://www.burghardt.pl/
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus