You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Andrey Filippov (Jira)" <ji...@apache.org> on 2021/12/19 08:53:00 UTC
[jira] [Comment Edited] (CAMEL-17347) Wrong polling camel-spring-rabbitmq consumer onexception behavior
[ https://issues.apache.org/jira/browse/CAMEL-17347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461414#comment-17461414 ]
Andrey Filippov edited comment on CAMEL-17347 at 12/19/21, 8:52 AM:
--------------------------------------------------------------------
[~davsclaus] thanks for the reply. Yes, org.apache.camel.component.springrabbit.SpringRabbitPollingConsumer uses org.springframework.amqp.rabbit.core.RabbitTemplate but the last one has a flag called 'channelTransacted' - [1]. And if I mark a route with <transacted /> and add
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
</bean>
to the scope the behavior does not change - messages are read from a queue immediately without intermediate acks. At the same time If I do the same tx stuff with Artemis, messages will not be marked as delivered until the end of the route. I might be wrong but Camel could use 'channelTransacted' flag in the polling consumer.
[1] [https://docs.spring.io/spring-amqp/reference/html/amqp.html#transactions]
was (Author: afilippov):
[~davsclaus] thanks for the reply. Yes, org.apache.camel.component.springrabbit.SpringRabbitPollingConsumer uses org.springframework.amqp.rabbit.core.RabbitTemplate but the last one has a flag called 'channelTransacted' - [1]. And if I mark a route with <transacted /> and add
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
</bean>
to the scope the behavior does not change - messages are read from a queue without acks. At the same time If I do the same tx stuff with Artemis messages will not be marked as delivered until the end of the route. I might be wrong but Camel could use 'channelTransacted' flag in the polling consumer.
[1] https://docs.spring.io/spring-amqp/reference/html/amqp.html#transactions
> Wrong polling camel-spring-rabbitmq consumer onexception behavior
> ------------------------------------------------------------------
>
> Key: CAMEL-17347
> URL: https://issues.apache.org/jira/browse/CAMEL-17347
> Project: Camel
> Issue Type: Bug
> Components: camel-spring
> Affects Versions: 3.11.3
> Environment: Java 11, Karaf 4.2.9
> Reporter: Andrey Filippov
> Priority: Major
>
> Hello,
> I am checking 2 simple routes. First one is:
> {{<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"}}
> {{ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{ xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 blueprint-1.0.0.xsd ">}}
> {{ <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">}}
> {{ <property name="uri" value="amqp://localhost:5672"/>}}
> {{ <property name="username" value="guest"/>}}
> {{ <property name="password" value="guest"/>}}
> {{ </bean>}}{{ <camelContext}}
> {{ xmlns="http://camel.apache.org/schema/blueprint"}}
> {{ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{ xsi:schemaLocation="http://camel.apache.org/schema/blueprint camel-blueprint-3.4.5.xsd">}}{{ <route id="consume">}}{{ <from uri="spring-rabbitmq:ex1?queues=bar&prefetchCount=1" />}}
> {{ <onException>}}
> {{ <exception>java.lang.Exception</exception>}}
> {{ <redeliveryPolicy maximumRedeliveries="2" redeliveryDelay="10000" retryAttemptedLogLevel="WARN"/>}}
> {{ <handled><constant>true</constant></handled>}}
> {{ <to uri="direct:error" />}}
> {{ </onException>}}{{ <log message=">>>Consumer start"/>}}{{ <throwException exceptionType="java.lang.NullPointerException"}}
> {{ message="Not supported, sorjan"/>}}{{ <log message=">>>Body: ${body}"/>}}
> {{ <log message=">>>Headers: ${headers}"/>}}
> {{ <when>}}
> {{ <simple>${body} == null</simple>}}
> {{ <log message=">>>@@@@@@@@@@@@@@@@@@@@@@ Body is null!!!!"/>}}
> {{ </when>}}
> {{ }}
> {{ </route>}}{{ <route>}}
> {{ <from uri="direct:error" />}}
> {{ <log message=">>>Error happens body: ${body}"/>}}
> {{ </route>}}{{ </camelContext>}}
> {{</blueprint>}}
> In this case a message from a queue is marked unacknowledged until the very end - after all redelivery attempts have fired. By the other hand, if I use polling consumer - like this:
> {{<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"}}
> {{ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{ xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 blueprint-1.0.0.xsd ">}}
> {{ <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">}}
> {{ <property name="uri" value="amqp://localhost:5672"/>}}
> {{ <property name="username" value="guest"/>}}
> {{ <property name="password" value="guest"/>}}
> {{ </bean>}}{{ <camelContext}}
> {{ xmlns="http://camel.apache.org/schema/blueprint"}}
> {{ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{ xsi:schemaLocation="http://camel.apache.org/schema/blueprint camel-blueprint-3.4.5.xsd">}}{{ <route id="consume">}}
> {{ <from uri="timer:consume?period=60000" />}}
> {{ <onException>}}
> {{ <exception>java.lang.Exception</exception>}}
> {{ <redeliveryPolicy maximumRedeliveries="2" redeliveryDelay="10000" retryAttemptedLogLevel="WARN"/>}}
> {{ <handled><constant>true</constant></handled>}}
> {{ <to uri="direct:error" />}}
> {{ </onException>}}
> {{ <log message=">>>Consumer start"/>}}{{ <pollEnrich>}}
> {{ <constant>spring-rabbitmq:ex1?queues=bar&prefetchCount=1&acknowledgeMode=MANUAL</constant>}}
> {{ </pollEnrich>}}{{ <throwException exceptionType="java.lang.NullPointerException"}}
> {{ message="Not supported, sorjan"/>}}
> {{ <log message=">>>Body: ${body}"/>}}
> {{ <log message=">>>Headers: ${headers}"/>}}
> {{ <when>}}
> {{ <simple>${body} == null</simple>}}
> {{ <log message=">>>@@@@@@@@@@@@@@@@@@@@@@ Body is null!!!!"/>}}
> {{ </when>}}
> {{ }}
> {{ </route>}}{{ <route>}}
> {{ <from uri="direct:error" />}}
> {{ <log message=">>>Error happens body: ${body}"/>}}
> {{ </route>}}{{ </camelContext>}}
> {{</blueprint>}}
> In this case a message is disappeared from a queue immediately after pollEnrich without intermediate unacknowledged mark.
> I think this behavior is not 100% correct.
> P.s. Sorry - I used a wrong component in this ticket description cause camel-spring-rabbitmq is not in the list.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)