You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Andrey Filippov (Jira)" <ji...@apache.org> on 2021/12/19 08:53:00 UTC

[jira] [Comment Edited] (CAMEL-17347) Wrong polling camel-spring-rabbitmq consumer onexception behavior

    [ https://issues.apache.org/jira/browse/CAMEL-17347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461414#comment-17461414 ] 

Andrey Filippov edited comment on CAMEL-17347 at 12/19/21, 8:52 AM:
--------------------------------------------------------------------

[~davsclaus] thanks for the reply. Yes, org.apache.camel.component.springrabbit.SpringRabbitPollingConsumer uses org.springframework.amqp.rabbit.core.RabbitTemplate but the last one has a flag called 'channelTransacted' - [1]. And if I mark a route with <transacted /> and add 

<bean id="rabbitTxManager"
        class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    </bean>

to the scope the behavior does not change - messages are read from a queue immediately without intermediate acks. At the same time If I do the same tx stuff with Artemis, messages will not be marked as delivered until the end of the route. I might be wrong but Camel could use 'channelTransacted' flag in the polling consumer.

 

 

[1] [https://docs.spring.io/spring-amqp/reference/html/amqp.html#transactions]


was (Author: afilippov):
[~davsclaus] thanks for the reply. Yes, org.apache.camel.component.springrabbit.SpringRabbitPollingConsumer uses org.springframework.amqp.rabbit.core.RabbitTemplate but the last one has a flag called 'channelTransacted' - [1]. And if I mark a route with <transacted /> and add 

<bean id="rabbitTxManager"
        class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    </bean>

to the scope the behavior does not change - messages are read from a queue without acks. At the same time If I do the same tx stuff with Artemis messages will not be marked as delivered until the end of the route. I might be wrong but Camel could use 'channelTransacted' flag in the polling consumer.

 

 

[1] https://docs.spring.io/spring-amqp/reference/html/amqp.html#transactions

> Wrong polling camel-spring-rabbitmq consumer onexception behavior 
> ------------------------------------------------------------------
>
>                 Key: CAMEL-17347
>                 URL: https://issues.apache.org/jira/browse/CAMEL-17347
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-spring
>    Affects Versions: 3.11.3
>         Environment: Java 11, Karaf 4.2.9
>            Reporter: Andrey Filippov
>            Priority: Major
>
> Hello,
> I am checking 2 simple routes. First one is:
> {{<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"}}
> {{    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{    xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 blueprint-1.0.0.xsd ">}}
> {{    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">}}
> {{      <property name="uri" value="amqp://localhost:5672"/>}}
> {{      <property name="username" value="guest"/>}}
> {{      <property name="password" value="guest"/>}}
> {{    </bean>}}{{    <camelContext}}
> {{        xmlns="http://camel.apache.org/schema/blueprint"}}
> {{        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{        xsi:schemaLocation="http://camel.apache.org/schema/blueprint camel-blueprint-3.4.5.xsd">}}{{        <route id="consume">}}{{            <from uri="spring-rabbitmq:ex1?queues=bar&amp;prefetchCount=1" />}}
> {{            <onException>}}
> {{                <exception>java.lang.Exception</exception>}}
> {{                <redeliveryPolicy maximumRedeliveries="2" redeliveryDelay="10000" retryAttemptedLogLevel="WARN"/>}}
> {{                <handled><constant>true</constant></handled>}}
> {{                <to uri="direct:error" />}}
> {{            </onException>}}{{            <log message=">>>Consumer start"/>}}{{            <throwException exceptionType="java.lang.NullPointerException"}}
> {{                                message="Not supported, sorjan"/>}}{{            <log message=">>>Body:  ${body}"/>}}
> {{            <log message=">>>Headers:  ${headers}"/>}}
> {{            <when>}}
> {{                <simple>${body} == null</simple>}}
> {{                <log message=">>>@@@@@@@@@@@@@@@@@@@@@@ Body is null!!!!"/>}}
> {{            </when>}}
> {{            }}
> {{        </route>}}{{        <route>}}
> {{            <from uri="direct:error" />}}
> {{            <log message=">>>Error happens body:  ${body}"/>}}
> {{        </route>}}{{    </camelContext>}}
> {{</blueprint>}}
> In this case a message from a queue is marked unacknowledged until the very end - after all redelivery attempts have fired. By the other hand, if I use polling consumer - like this:
> {{<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"}}
> {{    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{    xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 blueprint-1.0.0.xsd ">}}
> {{    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">}}
> {{      <property name="uri" value="amqp://localhost:5672"/>}}
> {{      <property name="username" value="guest"/>}}
> {{      <property name="password" value="guest"/>}}
> {{    </bean>}}{{    <camelContext}}
> {{        xmlns="http://camel.apache.org/schema/blueprint"}}
> {{        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{        xsi:schemaLocation="http://camel.apache.org/schema/blueprint camel-blueprint-3.4.5.xsd">}}{{        <route id="consume">}}
> {{            <from uri="timer:consume?period=60000" />}}
> {{            <onException>}}
> {{                <exception>java.lang.Exception</exception>}}
> {{                <redeliveryPolicy maximumRedeliveries="2" redeliveryDelay="10000" retryAttemptedLogLevel="WARN"/>}}
> {{                <handled><constant>true</constant></handled>}}
> {{                <to uri="direct:error" />}}
> {{            </onException>}}
> {{            <log message=">>>Consumer start"/>}}{{            <pollEnrich>}}
> {{                <constant>spring-rabbitmq:ex1?queues=bar&amp;prefetchCount=1&amp;acknowledgeMode=MANUAL</constant>}}
> {{            </pollEnrich>}}{{            <throwException exceptionType="java.lang.NullPointerException"}}
> {{                                message="Not supported, sorjan"/>}}
> {{            <log message=">>>Body:  ${body}"/>}}
> {{            <log message=">>>Headers:  ${headers}"/>}}
> {{            <when>}}
> {{                <simple>${body} == null</simple>}}
> {{                <log message=">>>@@@@@@@@@@@@@@@@@@@@@@ Body is null!!!!"/>}}
> {{            </when>}}
> {{            }}
> {{        </route>}}{{        <route>}}
> {{            <from uri="direct:error" />}}
> {{            <log message=">>>Error happens body:  ${body}"/>}}
> {{        </route>}}{{    </camelContext>}}
> {{</blueprint>}}
> In this case a message is disappeared from a queue immediately after pollEnrich without intermediate unacknowledged mark. 
> I think this behavior is not 100% correct. 
> P.s. Sorry - I used a wrong component in this ticket description cause camel-spring-rabbitmq is not in the list.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)