You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Valdis Andersons <va...@vhi.ie> on 2018/08/16 15:09:43 UTC

Camel with Rabbitmq: messages in temp reply queue not being acked

Hi All,

Hopefully someone here can educate me on the below matter.

Here is a rather long-winded description of the issue: https://stackoverflow.com/questions/51875646/apache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au

The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.

If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):

rest(restEndpoint).post(postEndpoint)
                  .type(MyRequest.class)
                  .route()
                  .startupOrder(Integer.MAX_VALUE - 2)
                                  .process(this::process)
                                  .choice()
                                    .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint, outputArchiveEndpoint)

I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.

I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.


Thanks and Best Regards,
Valdis

Re: Camel with Rabbitmq: messages in temp reply queue not being acked

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Thanks the PR has been merged, you can see from the fix-version in the
JIRA which releases it will be in

On Wed, Aug 22, 2018 at 10:58 AM, Valdis Andersons
<va...@vhi.ie> wrote:
> Hi Claus,
>
> Pull request #2490 has been created now.
> https://github.com/apache/camel/pull/2490
>
>  If you're happy with the change could you please merge it up and let me know which release will have the code in it?
> Also, if there is anything more I need to do with the fix/Pull Request please let me know.
>
>
> Thanks,
> Valdis
>
> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
> Sent: 22 August 2018 07:10
> To: users@camel.apache.org
> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not being acked
>
> Hi Validis
>
> Yeah that sounds good. Great to hear we have a solution for this issue now.
>
>
>
> On Tue, Aug 21, 2018 at 5:58 PM, Valdis Andersons <va...@vhi.ie> wrote:
>> Hi Claus,
>>
>> That seems to work as expected, deployed a local build to test and no more stale messages in un-acked states in the temp reply queues and in addition it doesn't seem to brake any tests either.
>> JIRA ticket here:
>> https://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2w
>> Qi5kMmhzrvfwg&s=33&u=https%3a%2f%2fissues%2eapache%2eorg%2fjira%2fbrow
>> se%2fCAMEL-12746
>>
>> Tomorrow I'll figure out how to use GitHub and Pull Requests correctly and will get the change committed there (life-long SVN user here).
>> For the time being we'll be using the local snapshot of the camel-rabbitmq component and once the fix has been promoted to a general release we'll then change over to that.
>>
>>
>> Thanks,
>> Valdis
>>
>> -----Original Message-----
>> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
>> Sent: 21 August 2018 11:22
>> To: users@camel.apache.org
>> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not
>> being acked
>>
>> On Tue, Aug 21, 2018 at 10:05 AM, Valdis Andersons <va...@vhi.ie> wrote:
>>> Hi Claus,
>>>
>>> Thanks for your time and the response. I'll see if I can check out the code today and build it locally and then deploy to a test environment.
>>> As you suggested, I'll make a change to the TemporaryQueueReplyManager at line 139 to always create a temp queue with autoAck=ture.
>>> Please let me know if you see a better alternative or if I'm missing something.
>>>
>>
>> I think this is a good idea and also what I would try first.
>>
>> We love contributions
>> https://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2w
>> Qi5kJ2izLOJww&s=33&u=https%3a%2f%2fgithub%2ecom%2fapache%2fcamel%2fblo
>> b%2fmaster%2fCONTRIBUTING%2emd
>>
>>>
>>> Thanks,
>>> Valdis
>>>
>>> -----Original Message-----
>>> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
>>> Sent: 21 August 2018 08:31
>>> To: users@camel.apache.org
>>> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not
>>> being acked
>>>
>>> Hi
>>>
>>> You may be on to something. It does smell like when using temporary queues for request/response then acknowledge should maybe be auto always, so the "other side" can see the message on the temporary queue, process it, and send back a reply message, for Camel to pickup.
>>>
>>> I wonder if you could modify the source code and try to "fix this"
>>> yourself and give it some testing, and then if so you are welcome to log a JIRA and provide a patch / github PR with the fix.
>>>
>>> Anyone else have other thoughts or comments?
>>>
>>> On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons <va...@vhi.ie> wrote:
>>>> Hi All,
>>>>
>>>> Been digging today through some of the Camel-RabbitMQ code and can confirm now that the temp queue is created with the main endpoint's relevant properties (prefetch, autoAck and some others). The ack mode is set when the consumer get bound to the channel - TemporaryQueueReplyManager line 139:
>>>>
>>>> /**
>>>>          * Bind consumer to channel
>>>>          */
>>>>         private void start() throws IOException {
>>>>             tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
>>>>         }
>>>>
>>>> What I haven't been able to dig out is where the TemporaryQueueReplyManager or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp queue related logic would be sending the manual ack to RabbitMQ (channel.basicAck) in case the original endpoint, that the settings are taken from, had autoAck set to false.
>>>>
>>>> My suspicion is that either the handler class or the manager class should be responsible for sending the manual ack, either TemporaryQueueReplyManager line 66-67:
>>>>
>>>> if (handler != null) {
>>>>             correlation.remove(correlationID);
>>>>             handler.onReply(correlationID, properties, message);
>>>>         }
>>>>
>>>> Or TemporaryQueueReplyHandler line 56 - 59:
>>>>
>>>> public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
>>>>         // create holder object with the the reply
>>>>         http://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2wQi5kJz0nLSLlg&s=33&u=http%3a%2f%2flog%2einfo%28"in onReply with correlationId= {}", correlationId);
>>>>         ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
>>>>         // process the reply
>>>>         replyManager.processReply(holder);
>>>>     }
>>>>
>>>> The processReply method in ReplyManagerSupport doesn't seem to do it either, really lost on this one.
>>>>
>>>> Could someone please point me to the place in the code where it's done? It might just maybe give me an idea or two of what I might be missing in my routing setup and how to get this working or at least how to work around this issue.
>>>>
>>>>
>>>> Thanks,
>>>> Valdis
>>>>
>>>> From: Valdis Andersons
>>>> Sent: 16 August 2018 16:10
>>>> To: users@camel.apache.org
>>>> Subject: Camel with Rabbitmq: messages in temp reply queue not being
>>>> acked
>>>>
>>>> Hi All,
>>>>
>>>> Hopefully someone here can educate me on the below matter.
>>>>
>>>> Here is a rather long-winded description of the issue:
>>>> https://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ
>>>> 0
>>>> s
>>>> L9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2
>>>> f
>>>> 5
>>>> 1875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-no
>>>> t
>>>> -
>>>> being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=57_726S
>>>> g
>>>> K
>>>> 7xmPqWydExKj9kMkCZ0sL9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow
>>>> %
>>>> 2
>>>> ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in
>>>> -
>>>> t
>>>> emp-reply-queue-not-being-acked-when-au>
>>>>
>>>> The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.
>>>>
>>>> If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
>>>> If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):
>>>>
>>>> rest(restEndpoint).post(postEndpoint)
>>>>                   .type(MyRequest.class)
>>>>                   .route()
>>>>                   .startupOrder(Integer.MAX_VALUE - 2)
>>>>                                   .process(this::process)
>>>>                                   .choice()
>>>>
>>>> .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(
>>>> o u tputEmailEndpoint, outputArchiveEndpoint)
>>>>
>>>> I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.
>>>>
>>>> I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.
>>>>
>>>>
>>>> Thanks and Best Regards,
>>>> Valdis
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> -----------------
>>> http://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx
>>> 5 mEd6Rs2qzNAw&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel
>>> in Action 2:
>>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKX
>>> x
>>> 5mEd6tij6abUA&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2
>>>
>>> Vhi Group DAC (Vhi) is a holding company for insurance and healthcare
>>> services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi
>>> Health Services DAC and Vhi Investments DAC. Vhi Healthcare DAC
>>> trading as Vhi Healthcare and Vhi Insurance DAC trading as Vhi
>>> Insurance are regulated by the Central Bank of Ireland. Vhi
>>> Healthcare is tied to Vhi Insurance DAC for health insurance in
>>> Ireland which is underwritten by Vhi Insurance DAC. Vhi Healthcare is
>>> tied to Zurich Life Assurance plc for Vhi Life Term Insurance and Vhi
>>> Mortgage Protection which is underwritten by Zurich Life Assurance
>>> plc. Vhi Healthcare is tied to Collinson Insurance Services Limited
>>> for MultiTrip Travel Insurance, Backpacker Travel Insurance and Vhi
>>> Dental Insurance which are underwritten by Great Lakes Insurance SE,
>>> UK branch and for Vhi Canada Cover and Vhi International Health
>>> Insurance which are underwritten by Astrenska Insurance Limited. For
>>> more information about the Vhi Group please go to:
>>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKX
>>> x 5mEd6dk36KZAg&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>>>
>>>
>>> Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh
>>> seirbhísí árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi
>>> Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC agus Vhi
>>> Investments DAC. Déanann Banc Ceannais na hÉireann rialáil ar Vhi
>>> Healthcare DAC, ag trádáil dó mar Vhi Healthcare, agus ar Vhi
>>> Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi Healthcare
>>> ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in
>>> Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare
>>> ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de
>>> chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá
>>> frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare
>>> ceangailte le Collinson Insurance Services Limited le haghaidh
>>> Árachas Taistil Ilturais agus Turasóirí Mála Droma agus Árachas
>>> Fiaclóireachta de chuid Vhi atá frithgheallta ag Great Lakes
>>> Insurance SE, UK branch agus le haghaidh Clúdach Cheanada de chuid
>>> Vhi agus Árachas Sláinte Idirnáisiúnta de chuid Vhi atá frithgheallta
>>> ag Astrenska Insurance Limited. Chun tuilleadh faisnéise a fháil faoi
>>> Ghrúpa Vhi, tabhair cuairt ar:
>>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKX
>>> x 5mEd6dk36KZAg&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>>>
>>> This e-mail and any files transmitted with it contain information which may be confidential and which may also be privileged and is intended solely for the use of the individual or entity to whom it is addressed. Unless you are the intended recipient you may not copy or use it, or disclose it to anyone else. Any opinions expressed are that of the individual and not necessarily that of the Vhi Group. If you have received this e-mail in error please notify the sender by return.
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2wQ
>> i5kM_6mbvbkQ&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in
>> Action 2:
>> https://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2w
>> Qi5kMD0zLGNwg&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2
>
>
>
> --
> Claus Ibsen
> -----------------
> http://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2wQi5kM_6mbvbkQ&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in Action 2: https://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2wQi5kMD0zLGNwg&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

RE: Camel with Rabbitmq: messages in temp reply queue not being acked

Posted by Valdis Andersons <va...@vhi.ie>.
Hi Claus,

Pull request #2490 has been created now.
https://github.com/apache/camel/pull/2490

 If you're happy with the change could you please merge it up and let me know which release will have the code in it?
Also, if there is anything more I need to do with the fix/Pull Request please let me know.


Thanks,
Valdis

-----Original Message-----
From: Claus Ibsen [mailto:claus.ibsen@gmail.com] 
Sent: 22 August 2018 07:10
To: users@camel.apache.org
Subject: Re: Camel with Rabbitmq: messages in temp reply queue not being acked

Hi Validis

Yeah that sounds good. Great to hear we have a solution for this issue now.



On Tue, Aug 21, 2018 at 5:58 PM, Valdis Andersons <va...@vhi.ie> wrote:
> Hi Claus,
>
> That seems to work as expected, deployed a local build to test and no more stale messages in un-acked states in the temp reply queues and in addition it doesn't seem to brake any tests either.
> JIRA ticket here: 
> https://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2w
> Qi5kMmhzrvfwg&s=33&u=https%3a%2f%2fissues%2eapache%2eorg%2fjira%2fbrow
> se%2fCAMEL-12746
>
> Tomorrow I'll figure out how to use GitHub and Pull Requests correctly and will get the change committed there (life-long SVN user here).
> For the time being we'll be using the local snapshot of the camel-rabbitmq component and once the fix has been promoted to a general release we'll then change over to that.
>
>
> Thanks,
> Valdis
>
> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
> Sent: 21 August 2018 11:22
> To: users@camel.apache.org
> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not 
> being acked
>
> On Tue, Aug 21, 2018 at 10:05 AM, Valdis Andersons <va...@vhi.ie> wrote:
>> Hi Claus,
>>
>> Thanks for your time and the response. I'll see if I can check out the code today and build it locally and then deploy to a test environment.
>> As you suggested, I'll make a change to the TemporaryQueueReplyManager at line 139 to always create a temp queue with autoAck=ture.
>> Please let me know if you see a better alternative or if I'm missing something.
>>
>
> I think this is a good idea and also what I would try first.
>
> We love contributions
> https://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2w
> Qi5kJ2izLOJww&s=33&u=https%3a%2f%2fgithub%2ecom%2fapache%2fcamel%2fblo
> b%2fmaster%2fCONTRIBUTING%2emd
>
>>
>> Thanks,
>> Valdis
>>
>> -----Original Message-----
>> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
>> Sent: 21 August 2018 08:31
>> To: users@camel.apache.org
>> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not 
>> being acked
>>
>> Hi
>>
>> You may be on to something. It does smell like when using temporary queues for request/response then acknowledge should maybe be auto always, so the "other side" can see the message on the temporary queue, process it, and send back a reply message, for Camel to pickup.
>>
>> I wonder if you could modify the source code and try to "fix this"
>> yourself and give it some testing, and then if so you are welcome to log a JIRA and provide a patch / github PR with the fix.
>>
>> Anyone else have other thoughts or comments?
>>
>> On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons <va...@vhi.ie> wrote:
>>> Hi All,
>>>
>>> Been digging today through some of the Camel-RabbitMQ code and can confirm now that the temp queue is created with the main endpoint's relevant properties (prefetch, autoAck and some others). The ack mode is set when the consumer get bound to the channel - TemporaryQueueReplyManager line 139:
>>>
>>> /**
>>>          * Bind consumer to channel
>>>          */
>>>         private void start() throws IOException {
>>>             tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
>>>         }
>>>
>>> What I haven't been able to dig out is where the TemporaryQueueReplyManager or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp queue related logic would be sending the manual ack to RabbitMQ (channel.basicAck) in case the original endpoint, that the settings are taken from, had autoAck set to false.
>>>
>>> My suspicion is that either the handler class or the manager class should be responsible for sending the manual ack, either TemporaryQueueReplyManager line 66-67:
>>>
>>> if (handler != null) {
>>>             correlation.remove(correlationID);
>>>             handler.onReply(correlationID, properties, message);
>>>         }
>>>
>>> Or TemporaryQueueReplyHandler line 56 - 59:
>>>
>>> public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
>>>         // create holder object with the the reply
>>>         http://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2wQi5kJz0nLSLlg&s=33&u=http%3a%2f%2flog%2einfo%28"in onReply with correlationId= {}", correlationId);
>>>         ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
>>>         // process the reply
>>>         replyManager.processReply(holder);
>>>     }
>>>
>>> The processReply method in ReplyManagerSupport doesn't seem to do it either, really lost on this one.
>>>
>>> Could someone please point me to the place in the code where it's done? It might just maybe give me an idea or two of what I might be missing in my routing setup and how to get this working or at least how to work around this issue.
>>>
>>>
>>> Thanks,
>>> Valdis
>>>
>>> From: Valdis Andersons
>>> Sent: 16 August 2018 16:10
>>> To: users@camel.apache.org
>>> Subject: Camel with Rabbitmq: messages in temp reply queue not being 
>>> acked
>>>
>>> Hi All,
>>>
>>> Hopefully someone here can educate me on the below matter.
>>>
>>> Here is a rather long-winded description of the issue:
>>> https://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ
>>> 0
>>> s
>>> L9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2
>>> f
>>> 5
>>> 1875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-no
>>> t
>>> -
>>> being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=57_726S
>>> g
>>> K
>>> 7xmPqWydExKj9kMkCZ0sL9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow
>>> %
>>> 2
>>> ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in
>>> -
>>> t
>>> emp-reply-queue-not-being-acked-when-au>
>>>
>>> The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.
>>>
>>> If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
>>> If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):
>>>
>>> rest(restEndpoint).post(postEndpoint)
>>>                   .type(MyRequest.class)
>>>                   .route()
>>>                   .startupOrder(Integer.MAX_VALUE - 2)
>>>                                   .process(this::process)
>>>                                   .choice()
>>>
>>> .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(
>>> o u tputEmailEndpoint, outputArchiveEndpoint)
>>>
>>> I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.
>>>
>>> I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.
>>>
>>>
>>> Thanks and Best Regards,
>>> Valdis
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx
>> 5 mEd6Rs2qzNAw&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel 
>> in Action 2:
>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKX
>> x
>> 5mEd6tij6abUA&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2
>>
>> Vhi Group DAC (Vhi) is a holding company for insurance and healthcare 
>> services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi 
>> Health Services DAC and Vhi Investments DAC. Vhi Healthcare DAC 
>> trading as Vhi Healthcare and Vhi Insurance DAC trading as Vhi 
>> Insurance are regulated by the Central Bank of Ireland. Vhi 
>> Healthcare is tied to Vhi Insurance DAC for health insurance in 
>> Ireland which is underwritten by Vhi Insurance DAC. Vhi Healthcare is 
>> tied to Zurich Life Assurance plc for Vhi Life Term Insurance and Vhi 
>> Mortgage Protection which is underwritten by Zurich Life Assurance 
>> plc. Vhi Healthcare is tied to Collinson Insurance Services Limited 
>> for MultiTrip Travel Insurance, Backpacker Travel Insurance and Vhi 
>> Dental Insurance which are underwritten by Great Lakes Insurance SE, 
>> UK branch and for Vhi Canada Cover and Vhi International Health 
>> Insurance which are underwritten by Astrenska Insurance Limited. For 
>> more information about the Vhi Group please go to:
>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKX
>> x 5mEd6dk36KZAg&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>>
>>
>> Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh 
>> seirbhísí árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi 
>> Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC agus Vhi 
>> Investments DAC. Déanann Banc Ceannais na hÉireann rialáil ar Vhi 
>> Healthcare DAC, ag trádáil dó mar Vhi Healthcare, agus ar Vhi 
>> Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi Healthcare 
>> ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in 
>> Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare 
>> ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de 
>> chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá 
>> frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare 
>> ceangailte le Collinson Insurance Services Limited le haghaidh 
>> Árachas Taistil Ilturais agus Turasóirí Mála Droma agus Árachas 
>> Fiaclóireachta de chuid Vhi atá frithgheallta ag Great Lakes 
>> Insurance SE, UK branch agus le haghaidh Clúdach Cheanada de chuid 
>> Vhi agus Árachas Sláinte Idirnáisiúnta de chuid Vhi atá frithgheallta 
>> ag Astrenska Insurance Limited. Chun tuilleadh faisnéise a fháil faoi 
>> Ghrúpa Vhi, tabhair cuairt ar:
>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKX
>> x 5mEd6dk36KZAg&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>>
>> This e-mail and any files transmitted with it contain information which may be confidential and which may also be privileged and is intended solely for the use of the individual or entity to whom it is addressed. Unless you are the intended recipient you may not copy or use it, or disclose it to anyone else. Any opinions expressed are that of the individual and not necessarily that of the Vhi Group. If you have received this e-mail in error please notify the sender by return.
>>
>>
>>
>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2wQ
> i5kM_6mbvbkQ&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in 
> Action 2: 
> https://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2w
> Qi5kMD0zLGNwg&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2



--
Claus Ibsen
-----------------
http://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2wQi5kM_6mbvbkQ&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in Action 2: https://scanmail.trustwave.com/?c=6600&d=7_7826JLPq2zjlJbYh3Jf1rqbQM2wQi5kMD0zLGNwg&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2

Re: Camel with Rabbitmq: messages in temp reply queue not being acked

Posted by Claus Ibsen <cl...@gmail.com>.
Hi Validis

Yeah that sounds good. Great to hear we have a solution for this issue now.



On Tue, Aug 21, 2018 at 5:58 PM, Valdis Andersons
<va...@vhi.ie> wrote:
> Hi Claus,
>
> That seems to work as expected, deployed a local build to test and no more stale messages in un-acked states in the temp reply queues and in addition it doesn't seem to brake any tests either.
> JIRA ticket here: https://issues.apache.org/jira/browse/CAMEL-12746
>
> Tomorrow I'll figure out how to use GitHub and Pull Requests correctly and will get the change committed there (life-long SVN user here).
> For the time being we'll be using the local snapshot of the camel-rabbitmq component and once the fix has been promoted to a general release we'll then change over to that.
>
>
> Thanks,
> Valdis
>
> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
> Sent: 21 August 2018 11:22
> To: users@camel.apache.org
> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not being acked
>
> On Tue, Aug 21, 2018 at 10:05 AM, Valdis Andersons <va...@vhi.ie> wrote:
>> Hi Claus,
>>
>> Thanks for your time and the response. I'll see if I can check out the code today and build it locally and then deploy to a test environment.
>> As you suggested, I'll make a change to the TemporaryQueueReplyManager at line 139 to always create a temp queue with autoAck=ture.
>> Please let me know if you see a better alternative or if I'm missing something.
>>
>
> I think this is a good idea and also what I would try first.
>
> We love contributions
> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5mEd_Y0j6SfUQ&s=33&u=https%3a%2f%2fgithub%2ecom%2fapache%2fcamel%2fblob%2fmaster%2fCONTRIBUTING%2emd
>
>>
>> Thanks,
>> Valdis
>>
>> -----Original Message-----
>> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
>> Sent: 21 August 2018 08:31
>> To: users@camel.apache.org
>> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not
>> being acked
>>
>> Hi
>>
>> You may be on to something. It does smell like when using temporary queues for request/response then acknowledge should maybe be auto always, so the "other side" can see the message on the temporary queue, process it, and send back a reply message, for Camel to pickup.
>>
>> I wonder if you could modify the source code and try to "fix this"
>> yourself and give it some testing, and then if so you are welcome to log a JIRA and provide a patch / github PR with the fix.
>>
>> Anyone else have other thoughts or comments?
>>
>> On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons <va...@vhi.ie> wrote:
>>> Hi All,
>>>
>>> Been digging today through some of the Camel-RabbitMQ code and can confirm now that the temp queue is created with the main endpoint's relevant properties (prefetch, autoAck and some others). The ack mode is set when the consumer get bound to the channel - TemporaryQueueReplyManager line 139:
>>>
>>> /**
>>>          * Bind consumer to channel
>>>          */
>>>         private void start() throws IOException {
>>>             tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
>>>         }
>>>
>>> What I haven't been able to dig out is where the TemporaryQueueReplyManager or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp queue related logic would be sending the manual ack to RabbitMQ (channel.basicAck) in case the original endpoint, that the settings are taken from, had autoAck set to false.
>>>
>>> My suspicion is that either the handler class or the manager class should be responsible for sending the manual ack, either TemporaryQueueReplyManager line 66-67:
>>>
>>> if (handler != null) {
>>>             correlation.remove(correlationID);
>>>             handler.onReply(correlationID, properties, message);
>>>         }
>>>
>>> Or TemporaryQueueReplyHandler line 56 - 59:
>>>
>>> public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
>>>         // create holder object with the the reply
>>>         http://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5mEd_di36OdBA&s=33&u=http%3a%2f%2flog%2einfo%28"in onReply with correlationId= {}", correlationId);
>>>         ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
>>>         // process the reply
>>>         replyManager.processReply(holder);
>>>     }
>>>
>>> The processReply method in ReplyManagerSupport doesn't seem to do it either, really lost on this one.
>>>
>>> Could someone please point me to the place in the code where it's done? It might just maybe give me an idea or two of what I might be missing in my routing setup and how to get this working or at least how to work around this issue.
>>>
>>>
>>> Thanks,
>>> Valdis
>>>
>>> From: Valdis Andersons
>>> Sent: 16 August 2018 16:10
>>> To: users@camel.apache.org
>>> Subject: Camel with Rabbitmq: messages in temp reply queue not being
>>> acked
>>>
>>> Hi All,
>>>
>>> Hopefully someone here can educate me on the below matter.
>>>
>>> Here is a rather long-winded description of the issue:
>>> https://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0
>>> s
>>> L9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f
>>> 5
>>> 1875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not
>>> -
>>> being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=57_726Sg
>>> K
>>> 7xmPqWydExKj9kMkCZ0sL9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%
>>> 2
>>> ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-
>>> t
>>> emp-reply-queue-not-being-acked-when-au>
>>>
>>> The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.
>>>
>>> If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
>>> If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):
>>>
>>> rest(restEndpoint).post(postEndpoint)
>>>                   .type(MyRequest.class)
>>>                   .route()
>>>                   .startupOrder(Integer.MAX_VALUE - 2)
>>>                                   .process(this::process)
>>>                                   .choice()
>>>
>>> .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(o
>>> u tputEmailEndpoint, outputArchiveEndpoint)
>>>
>>> I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.
>>>
>>> I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.
>>>
>>>
>>> Thanks and Best Regards,
>>> Valdis
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5
>> mEd6Rs2qzNAw&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in
>> Action 2:
>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx
>> 5mEd6tij6abUA&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2
>>
>> Vhi Group DAC (Vhi) is a holding company for insurance and healthcare
>> services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi
>> Health Services DAC and Vhi Investments DAC. Vhi Healthcare DAC
>> trading as Vhi Healthcare and Vhi Insurance DAC trading as Vhi
>> Insurance are regulated by the Central Bank of Ireland. Vhi Healthcare
>> is tied to Vhi Insurance DAC for health insurance in Ireland which is
>> underwritten by Vhi Insurance DAC. Vhi Healthcare is tied to Zurich
>> Life Assurance plc for Vhi Life Term Insurance and Vhi Mortgage
>> Protection which is underwritten by Zurich Life Assurance plc. Vhi
>> Healthcare is tied to Collinson Insurance Services Limited for
>> MultiTrip Travel Insurance, Backpacker Travel Insurance and Vhi Dental
>> Insurance which are underwritten by Great Lakes Insurance SE, UK
>> branch and for Vhi Canada Cover and Vhi International Health Insurance
>> which are underwritten by Astrenska Insurance Limited. For more
>> information about the Vhi Group please go to:
>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx
>> 5mEd6dk36KZAg&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>>
>>
>> Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh
>> seirbhísí árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi
>> Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC agus Vhi
>> Investments DAC. Déanann Banc Ceannais na hÉireann rialáil ar Vhi
>> Healthcare DAC, ag trádáil dó mar Vhi Healthcare, agus ar Vhi
>> Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi Healthcare
>> ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in Éirinn,
>> rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare
>> ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de
>> chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá
>> frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare
>> ceangailte le Collinson Insurance Services Limited le haghaidh Árachas
>> Taistil Ilturais agus Turasóirí Mála Droma agus Árachas Fiaclóireachta
>> de chuid Vhi atá frithgheallta ag Great Lakes Insurance SE, UK branch
>> agus le haghaidh Clúdach Cheanada de chuid Vhi agus Árachas Sláinte
>> Idirnáisiúnta de chuid Vhi atá frithgheallta ag Astrenska Insurance
>> Limited. Chun tuilleadh faisnéise a fháil faoi Ghrúpa Vhi, tabhair
>> cuairt ar:
>> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx
>> 5mEd6dk36KZAg&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>>
>> This e-mail and any files transmitted with it contain information which may be confidential and which may also be privileged and is intended solely for the use of the individual or entity to whom it is addressed. Unless you are the intended recipient you may not copy or use it, or disclose it to anyone else. Any opinions expressed are that of the individual and not necessarily that of the Vhi Group. If you have received this e-mail in error please notify the sender by return.
>>
>>
>>
>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5mEd6Rs2qzNAw&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in Action 2: https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5mEd6tij6abUA&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

RE: Camel with Rabbitmq: messages in temp reply queue not being acked

Posted by Valdis Andersons <va...@vhi.ie>.
Hi Claus,

That seems to work as expected, deployed a local build to test and no more stale messages in un-acked states in the temp reply queues and in addition it doesn't seem to brake any tests either.
JIRA ticket here: https://issues.apache.org/jira/browse/CAMEL-12746

Tomorrow I'll figure out how to use GitHub and Pull Requests correctly and will get the change committed there (life-long SVN user here).
For the time being we'll be using the local snapshot of the camel-rabbitmq component and once the fix has been promoted to a general release we'll then change over to that.


Thanks,
Valdis

-----Original Message-----
From: Claus Ibsen [mailto:claus.ibsen@gmail.com] 
Sent: 21 August 2018 11:22
To: users@camel.apache.org
Subject: Re: Camel with Rabbitmq: messages in temp reply queue not being acked

On Tue, Aug 21, 2018 at 10:05 AM, Valdis Andersons <va...@vhi.ie> wrote:
> Hi Claus,
>
> Thanks for your time and the response. I'll see if I can check out the code today and build it locally and then deploy to a test environment.
> As you suggested, I'll make a change to the TemporaryQueueReplyManager at line 139 to always create a temp queue with autoAck=ture.
> Please let me know if you see a better alternative or if I'm missing something.
>

I think this is a good idea and also what I would try first.

We love contributions
https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5mEd_Y0j6SfUQ&s=33&u=https%3a%2f%2fgithub%2ecom%2fapache%2fcamel%2fblob%2fmaster%2fCONTRIBUTING%2emd

>
> Thanks,
> Valdis
>
> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
> Sent: 21 August 2018 08:31
> To: users@camel.apache.org
> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not 
> being acked
>
> Hi
>
> You may be on to something. It does smell like when using temporary queues for request/response then acknowledge should maybe be auto always, so the "other side" can see the message on the temporary queue, process it, and send back a reply message, for Camel to pickup.
>
> I wonder if you could modify the source code and try to "fix this"
> yourself and give it some testing, and then if so you are welcome to log a JIRA and provide a patch / github PR with the fix.
>
> Anyone else have other thoughts or comments?
>
> On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons <va...@vhi.ie> wrote:
>> Hi All,
>>
>> Been digging today through some of the Camel-RabbitMQ code and can confirm now that the temp queue is created with the main endpoint's relevant properties (prefetch, autoAck and some others). The ack mode is set when the consumer get bound to the channel - TemporaryQueueReplyManager line 139:
>>
>> /**
>>          * Bind consumer to channel
>>          */
>>         private void start() throws IOException {
>>             tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
>>         }
>>
>> What I haven't been able to dig out is where the TemporaryQueueReplyManager or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp queue related logic would be sending the manual ack to RabbitMQ (channel.basicAck) in case the original endpoint, that the settings are taken from, had autoAck set to false.
>>
>> My suspicion is that either the handler class or the manager class should be responsible for sending the manual ack, either TemporaryQueueReplyManager line 66-67:
>>
>> if (handler != null) {
>>             correlation.remove(correlationID);
>>             handler.onReply(correlationID, properties, message);
>>         }
>>
>> Or TemporaryQueueReplyHandler line 56 - 59:
>>
>> public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
>>         // create holder object with the the reply
>>         http://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5mEd_di36OdBA&s=33&u=http%3a%2f%2flog%2einfo%28"in onReply with correlationId= {}", correlationId);
>>         ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
>>         // process the reply
>>         replyManager.processReply(holder);
>>     }
>>
>> The processReply method in ReplyManagerSupport doesn't seem to do it either, really lost on this one.
>>
>> Could someone please point me to the place in the code where it's done? It might just maybe give me an idea or two of what I might be missing in my routing setup and how to get this working or at least how to work around this issue.
>>
>>
>> Thanks,
>> Valdis
>>
>> From: Valdis Andersons
>> Sent: 16 August 2018 16:10
>> To: users@camel.apache.org
>> Subject: Camel with Rabbitmq: messages in temp reply queue not being 
>> acked
>>
>> Hi All,
>>
>> Hopefully someone here can educate me on the below matter.
>>
>> Here is a rather long-winded description of the issue:
>> https://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0
>> s
>> L9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f
>> 5
>> 1875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not
>> - 
>> being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=57_726Sg
>> K
>> 7xmPqWydExKj9kMkCZ0sL9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%
>> 2 
>> ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-
>> t
>> emp-reply-queue-not-being-acked-when-au>
>>
>> The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.
>>
>> If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
>> If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):
>>
>> rest(restEndpoint).post(postEndpoint)
>>                   .type(MyRequest.class)
>>                   .route()
>>                   .startupOrder(Integer.MAX_VALUE - 2)
>>                                   .process(this::process)
>>                                   .choice()
>>
>> .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(o
>> u tputEmailEndpoint, outputArchiveEndpoint)
>>
>> I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.
>>
>> I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.
>>
>>
>> Thanks and Best Regards,
>> Valdis
>
>
>
> --
> Claus Ibsen
> -----------------
> http://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5
> mEd6Rs2qzNAw&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in 
> Action 2: 
> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx
> 5mEd6tij6abUA&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2
>
> Vhi Group DAC (Vhi) is a holding company for insurance and healthcare 
> services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi 
> Health Services DAC and Vhi Investments DAC. Vhi Healthcare DAC 
> trading as Vhi Healthcare and Vhi Insurance DAC trading as Vhi 
> Insurance are regulated by the Central Bank of Ireland. Vhi Healthcare 
> is tied to Vhi Insurance DAC for health insurance in Ireland which is 
> underwritten by Vhi Insurance DAC. Vhi Healthcare is tied to Zurich 
> Life Assurance plc for Vhi Life Term Insurance and Vhi Mortgage 
> Protection which is underwritten by Zurich Life Assurance plc. Vhi 
> Healthcare is tied to Collinson Insurance Services Limited for 
> MultiTrip Travel Insurance, Backpacker Travel Insurance and Vhi Dental 
> Insurance which are underwritten by Great Lakes Insurance SE, UK 
> branch and for Vhi Canada Cover and Vhi International Health Insurance 
> which are underwritten by Astrenska Insurance Limited. For more 
> information about the Vhi Group please go to: 
> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx
> 5mEd6dk36KZAg&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>
>
> Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh 
> seirbhísí árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi 
> Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC agus Vhi 
> Investments DAC. Déanann Banc Ceannais na hÉireann rialáil ar Vhi 
> Healthcare DAC, ag trádáil dó mar Vhi Healthcare, agus ar Vhi 
> Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi Healthcare 
> ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in Éirinn, 
> rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare 
> ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de 
> chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá 
> frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare 
> ceangailte le Collinson Insurance Services Limited le haghaidh Árachas 
> Taistil Ilturais agus Turasóirí Mála Droma agus Árachas Fiaclóireachta 
> de chuid Vhi atá frithgheallta ag Great Lakes Insurance SE, UK branch 
> agus le haghaidh Clúdach Cheanada de chuid Vhi agus Árachas Sláinte 
> Idirnáisiúnta de chuid Vhi atá frithgheallta ag Astrenska Insurance 
> Limited. Chun tuilleadh faisnéise a fháil faoi Ghrúpa Vhi, tabhair 
> cuairt ar: 
> https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx
> 5mEd6dk36KZAg&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>
> This e-mail and any files transmitted with it contain information which may be confidential and which may also be privileged and is intended solely for the use of the individual or entity to whom it is addressed. Unless you are the intended recipient you may not copy or use it, or disclose it to anyone else. Any opinions expressed are that of the individual and not necessarily that of the Vhi Group. If you have received this e-mail in error please notify the sender by return.
>
>
>
>
>



--
Claus Ibsen
-----------------
http://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5mEd6Rs2qzNAw&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in Action 2: https://scanmail.trustwave.com/?c=6600&d=9ef72zTINdsUDqY8pDvfCEraujKXx5mEd6tij6abUA&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2

Re: Camel with Rabbitmq: messages in temp reply queue not being acked

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Aug 21, 2018 at 10:05 AM, Valdis Andersons
<va...@vhi.ie> wrote:
> Hi Claus,
>
> Thanks for your time and the response. I'll see if I can check out the code today and build it locally and then deploy to a test environment.
> As you suggested, I'll make a change to the TemporaryQueueReplyManager at line 139 to always create a temp queue with autoAck=ture.
> Please let me know if you see a better alternative or if I'm missing something.
>

I think this is a good idea and also what I would try first.

We love contributions
https://github.com/apache/camel/blob/master/CONTRIBUTING.md

>
> Thanks,
> Valdis
>
> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
> Sent: 21 August 2018 08:31
> To: users@camel.apache.org
> Subject: Re: Camel with Rabbitmq: messages in temp reply queue not being acked
>
> Hi
>
> You may be on to something. It does smell like when using temporary queues for request/response then acknowledge should maybe be auto always, so the "other side" can see the message on the temporary queue, process it, and send back a reply message, for Camel to pickup.
>
> I wonder if you could modify the source code and try to "fix this"
> yourself and give it some testing, and then if so you are welcome to log a JIRA and provide a patch / github PR with the fix.
>
> Anyone else have other thoughts or comments?
>
> On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons <va...@vhi.ie> wrote:
>> Hi All,
>>
>> Been digging today through some of the Camel-RabbitMQ code and can confirm now that the temp queue is created with the main endpoint's relevant properties (prefetch, autoAck and some others). The ack mode is set when the consumer get bound to the channel - TemporaryQueueReplyManager line 139:
>>
>> /**
>>          * Bind consumer to channel
>>          */
>>         private void start() throws IOException {
>>             tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
>>         }
>>
>> What I haven't been able to dig out is where the TemporaryQueueReplyManager or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp queue related logic would be sending the manual ack to RabbitMQ (channel.basicAck) in case the original endpoint, that the settings are taken from, had autoAck set to false.
>>
>> My suspicion is that either the handler class or the manager class should be responsible for sending the manual ack, either TemporaryQueueReplyManager line 66-67:
>>
>> if (handler != null) {
>>             correlation.remove(correlationID);
>>             handler.onReply(correlationID, properties, message);
>>         }
>>
>> Or TemporaryQueueReplyHandler line 56 - 59:
>>
>> public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
>>         // create holder object with the the reply
>>         http://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0sL9Ja5wGnZdh0g&s=33&u=http%3a%2f%2flog%2einfo%28"in onReply with correlationId= {}", correlationId);
>>         ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
>>         // process the reply
>>         replyManager.processReply(holder);
>>     }
>>
>> The processReply method in ReplyManagerSupport doesn't seem to do it either, really lost on this one.
>>
>> Could someone please point me to the place in the code where it's done? It might just maybe give me an idea or two of what I might be missing in my routing setup and how to get this working or at least how to work around this issue.
>>
>>
>> Thanks,
>> Valdis
>>
>> From: Valdis Andersons
>> Sent: 16 August 2018 16:10
>> To: users@camel.apache.org
>> Subject: Camel with Rabbitmq: messages in temp reply queue not being
>> acked
>>
>> Hi All,
>>
>> Hopefully someone here can educate me on the below matter.
>>
>> Here is a rather long-winded description of the issue:
>> https://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0s
>> L9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f5
>> 1875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-
>> being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=57_726SgK
>> 7xmPqWydExKj9kMkCZ0sL9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%2
>> ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-t
>> emp-reply-queue-not-being-acked-when-au>
>>
>> The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.
>>
>> If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
>> If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):
>>
>> rest(restEndpoint).post(postEndpoint)
>>                   .type(MyRequest.class)
>>                   .route()
>>                   .startupOrder(Integer.MAX_VALUE - 2)
>>                                   .process(this::process)
>>                                   .choice()
>>
>> .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(ou
>> tputEmailEndpoint, outputArchiveEndpoint)
>>
>> I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.
>>
>> I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.
>>
>>
>> Thanks and Best Regards,
>> Valdis
>
>
>
> --
> Claus Ibsen
> -----------------
> http://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0sL9Ja88ImJgx1Q&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in Action 2: https://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0sL9Ja8AGzZJnhg&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2
>
> Vhi Group DAC (Vhi) is a holding company for insurance and healthcare services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC and Vhi Investments DAC. Vhi Healthcare DAC trading as Vhi Healthcare and Vhi Insurance DAC trading as Vhi Insurance are regulated by the Central Bank of Ireland. Vhi Healthcare is tied to Vhi Insurance DAC for health insurance in Ireland which is underwritten by Vhi Insurance DAC. Vhi Healthcare is tied to Zurich Life Assurance plc for Vhi Life Term Insurance and Vhi Mortgage Protection which is underwritten by Zurich Life Assurance plc. Vhi Healthcare is tied to Collinson Insurance Services Limited for MultiTrip Travel Insurance, Backpacker Travel Insurance and Vhi Dental Insurance which are underwritten by Great Lakes Insurance SE, UK branch and for Vhi Canada Cover and Vhi International Health Insurance which are underwritten by Astrenska Insurance Limited. For more information about the Vhi Group please go to: https://www.vhi.ie/about-vhi.
>
>
> Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh seirbhísí árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC agus Vhi Investments DAC. Déanann Banc Ceannais na hÉireann rialáil ar Vhi Healthcare DAC, ag trádáil dó mar Vhi Healthcare, agus ar Vhi Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi Healthcare ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare ceangailte le Collinson Insurance Services Limited le haghaidh Árachas Taistil Ilturais agus Turasóirí Mála Droma agus Árachas Fiaclóireachta de chuid Vhi atá frithgheallta ag Great Lakes Insurance SE, UK branch agus le haghaidh Clúdach Cheanada de chuid Vhi agus Árachas Sláinte Idirnáisiúnta de chuid Vhi atá frithgheallta ag Astrenska Insurance Limited. Chun tuilleadh faisnéise a fháil faoi Ghrúpa Vhi, tabhair cuairt ar: https://www.vhi.ie/about-vhi.
>
> This e-mail and any files transmitted with it contain information which may be confidential and which may also be privileged and is intended solely for the use of the individual or entity to whom it is addressed. Unless you are the intended recipient you may not copy or use it, or disclose it to anyone else. Any opinions expressed are that of the individual and not necessarily that of the Vhi Group. If you have received this e-mail in error please notify the sender by return.
>
>
>
>
>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

RE: Camel with Rabbitmq: messages in temp reply queue not being acked

Posted by Valdis Andersons <va...@vhi.ie>.
Hi Claus,

Thanks for your time and the response. I'll see if I can check out the code today and build it locally and then deploy to a test environment.
As you suggested, I'll make a change to the TemporaryQueueReplyManager at line 139 to always create a temp queue with autoAck=ture.
Please let me know if you see a better alternative or if I'm missing something.


Thanks,
Valdis

-----Original Message-----
From: Claus Ibsen [mailto:claus.ibsen@gmail.com] 
Sent: 21 August 2018 08:31
To: users@camel.apache.org
Subject: Re: Camel with Rabbitmq: messages in temp reply queue not being acked

Hi

You may be on to something. It does smell like when using temporary queues for request/response then acknowledge should maybe be auto always, so the "other side" can see the message on the temporary queue, process it, and send back a reply message, for Camel to pickup.

I wonder if you could modify the source code and try to "fix this"
yourself and give it some testing, and then if so you are welcome to log a JIRA and provide a patch / github PR with the fix.

Anyone else have other thoughts or comments?

On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons <va...@vhi.ie> wrote:
> Hi All,
>
> Been digging today through some of the Camel-RabbitMQ code and can confirm now that the temp queue is created with the main endpoint's relevant properties (prefetch, autoAck and some others). The ack mode is set when the consumer get bound to the channel - TemporaryQueueReplyManager line 139:
>
> /**
>          * Bind consumer to channel
>          */
>         private void start() throws IOException {
>             tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
>         }
>
> What I haven't been able to dig out is where the TemporaryQueueReplyManager or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp queue related logic would be sending the manual ack to RabbitMQ (channel.basicAck) in case the original endpoint, that the settings are taken from, had autoAck set to false.
>
> My suspicion is that either the handler class or the manager class should be responsible for sending the manual ack, either TemporaryQueueReplyManager line 66-67:
>
> if (handler != null) {
>             correlation.remove(correlationID);
>             handler.onReply(correlationID, properties, message);
>         }
>
> Or TemporaryQueueReplyHandler line 56 - 59:
>
> public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
>         // create holder object with the the reply
>         http://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0sL9Ja5wGnZdh0g&s=33&u=http%3a%2f%2flog%2einfo%28"in onReply with correlationId= {}", correlationId);
>         ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
>         // process the reply
>         replyManager.processReply(holder);
>     }
>
> The processReply method in ReplyManagerSupport doesn't seem to do it either, really lost on this one.
>
> Could someone please point me to the place in the code where it's done? It might just maybe give me an idea or two of what I might be missing in my routing setup and how to get this working or at least how to work around this issue.
>
>
> Thanks,
> Valdis
>
> From: Valdis Andersons
> Sent: 16 August 2018 16:10
> To: users@camel.apache.org
> Subject: Camel with Rabbitmq: messages in temp reply queue not being 
> acked
>
> Hi All,
>
> Hopefully someone here can educate me on the below matter.
>
> Here is a rather long-winded description of the issue: 
> https://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0s
> L9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f5
> 1875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-
> being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=57_726SgK
> 7xmPqWydExKj9kMkCZ0sL9Ja54FnsFiiA&s=33&u=https%3a%2f%2fstackoverflow%2
> ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-t
> emp-reply-queue-not-being-acked-when-au>
>
> The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.
>
> If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
> If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):
>
> rest(restEndpoint).post(postEndpoint)
>                   .type(MyRequest.class)
>                   .route()
>                   .startupOrder(Integer.MAX_VALUE - 2)
>                                   .process(this::process)
>                                   .choice()
>                                     
> .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(ou
> tputEmailEndpoint, outputArchiveEndpoint)
>
> I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.
>
> I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.
>
>
> Thanks and Best Regards,
> Valdis



--
Claus Ibsen
-----------------
http://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0sL9Ja88ImJgx1Q&s=33&u=http%3a%2f%2fdavsclaus%2ecom @davsclaus Camel in Action 2: https://scanmail.trustwave.com/?c=6600&d=57_726SgK7xmPqWydExKj9kMkCZ0sL9Ja8AGzZJnhg&s=33&u=https%3a%2f%2fwww%2emanning%2ecom%2fibsen2

Vhi Group DAC (Vhi) is a holding company for insurance and healthcare services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC and Vhi Investments DAC. Vhi Healthcare DAC trading as Vhi Healthcare and Vhi Insurance DAC trading as Vhi Insurance are regulated by the Central Bank of Ireland. Vhi Healthcare is tied to Vhi Insurance DAC for health insurance in Ireland which is underwritten by Vhi Insurance DAC. Vhi Healthcare is tied to Zurich Life Assurance plc for Vhi Life Term Insurance and Vhi Mortgage Protection which is underwritten by Zurich Life Assurance plc. Vhi Healthcare is tied to Collinson Insurance Services Limited for MultiTrip Travel Insurance, Backpacker Travel Insurance and Vhi Dental Insurance which are underwritten by Great Lakes Insurance SE, UK branch and for Vhi Canada Cover and Vhi International Health Insurance which are underwritten by Astrenska Insurance Limited. For more information about the Vhi Group please go to: https://www.vhi.ie/about-vhi. 


Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh seirbhísí árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC agus Vhi Investments DAC. Déanann Banc Ceannais na hÉireann rialáil ar Vhi Healthcare DAC, ag trádáil dó mar Vhi Healthcare, agus ar Vhi Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi Healthcare ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare ceangailte le Collinson Insurance Services Limited le haghaidh Árachas Taistil Ilturais agus Turasóirí Mála Droma agus Árachas Fiaclóireachta de chuid Vhi atá frithgheallta ag Great Lakes Insurance SE, UK branch agus le haghaidh Clúdach Cheanada de chuid Vhi agus Árachas Sláinte Idirnáisiúnta de chuid Vhi atá frithgheallta ag Astrenska Insurance Limited. Chun tuilleadh faisnéise a fháil faoi Ghrúpa Vhi, tabhair cuairt ar: https://www.vhi.ie/about-vhi. 

This e-mail and any files transmitted with it contain information which may be confidential and which may also be privileged and is intended solely for the use of the individual or entity to whom it is addressed. Unless you are the intended recipient you may not copy or use it, or disclose it to anyone else. Any opinions expressed are that of the individual and not necessarily that of the Vhi Group. If you have received this e-mail in error please notify the sender by return.






Re: Camel with Rabbitmq: messages in temp reply queue not being acked

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

You may be on to something. It does smell like when using temporary
queues for request/response then acknowledge should maybe be auto
always, so the "other side" can see
the message on the temporary queue, process it, and send back a reply
message, for Camel to pickup.

I wonder if you could modify the source code and try to "fix this"
yourself and give it some testing, and then if so you are welcome to
log a JIRA and provide a patch / github PR with the fix.

Anyone else have other thoughts or comments?

On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons
<va...@vhi.ie> wrote:
> Hi All,
>
> Been digging today through some of the Camel-RabbitMQ code and can confirm now that the temp queue is created with the main endpoint's relevant properties (prefetch, autoAck and some others). The ack mode is set when the consumer get bound to the channel - TemporaryQueueReplyManager line 139:
>
> /**
>          * Bind consumer to channel
>          */
>         private void start() throws IOException {
>             tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
>         }
>
> What I haven't been able to dig out is where the TemporaryQueueReplyManager or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp queue related logic would be sending the manual ack to RabbitMQ (channel.basicAck) in case the original endpoint, that the settings are taken from, had autoAck set to false.
>
> My suspicion is that either the handler class or the manager class should be responsible for sending the manual ack, either TemporaryQueueReplyManager line 66-67:
>
> if (handler != null) {
>             correlation.remove(correlationID);
>             handler.onReply(correlationID, properties, message);
>         }
>
> Or TemporaryQueueReplyHandler line 56 - 59:
>
> public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
>         // create holder object with the the reply
>         log.info("in onReply with correlationId= {}", correlationId);
>         ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
>         // process the reply
>         replyManager.processReply(holder);
>     }
>
> The processReply method in ReplyManagerSupport doesn't seem to do it either, really lost on this one.
>
> Could someone please point me to the place in the code where it's done? It might just maybe give me an idea or two of what I might be missing in my routing setup and how to get this working or at least how to work around this issue.
>
>
> Thanks,
> Valdis
>
> From: Valdis Andersons
> Sent: 16 August 2018 16:10
> To: users@camel.apache.org
> Subject: Camel with Rabbitmq: messages in temp reply queue not being acked
>
> Hi All,
>
> Hopefully someone here can educate me on the below matter.
>
> Here is a rather long-winded description of the issue: https://stackoverflow.com/questions/51875646/apache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=4pP1262DeccXeHHSPqygoqcKZcNR_lN14CUo2YIHRg&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au>
>
> The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.
>
> If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
> If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):
>
> rest(restEndpoint).post(postEndpoint)
>                   .type(MyRequest.class)
>                   .route()
>                   .startupOrder(Integer.MAX_VALUE - 2)
>                                   .process(this::process)
>                                   .choice()
>                                     .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint, outputArchiveEndpoint)
>
> I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.
>
> I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.
>
>
> Thanks and Best Regards,
> Valdis



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

RE: Camel with Rabbitmq: messages in temp reply queue not being acked

Posted by Valdis Andersons <va...@vhi.ie>.
Hi All,

Been digging today through some of the Camel-RabbitMQ code and can confirm now that the temp queue is created with the main endpoint's relevant properties (prefetch, autoAck and some others). The ack mode is set when the consumer get bound to the channel - TemporaryQueueReplyManager line 139:

/**
         * Bind consumer to channel
         */
        private void start() throws IOException {
            tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
        }

What I haven't been able to dig out is where the TemporaryQueueReplyManager or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp queue related logic would be sending the manual ack to RabbitMQ (channel.basicAck) in case the original endpoint, that the settings are taken from, had autoAck set to false.

My suspicion is that either the handler class or the manager class should be responsible for sending the manual ack, either TemporaryQueueReplyManager line 66-67:

if (handler != null) {
            correlation.remove(correlationID);
            handler.onReply(correlationID, properties, message);
        }

Or TemporaryQueueReplyHandler line 56 - 59:

public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
        // create holder object with the the reply
        log.info("in onReply with correlationId= {}", correlationId);
        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
        // process the reply
        replyManager.processReply(holder);
    }

The processReply method in ReplyManagerSupport doesn't seem to do it either, really lost on this one.

Could someone please point me to the place in the code where it's done? It might just maybe give me an idea or two of what I might be missing in my routing setup and how to get this working or at least how to work around this issue.


Thanks,
Valdis

From: Valdis Andersons
Sent: 16 August 2018 16:10
To: users@camel.apache.org
Subject: Camel with Rabbitmq: messages in temp reply queue not being acked

Hi All,

Hopefully someone here can educate me on the below matter.

Here is a rather long-winded description of the issue: https://stackoverflow.com/questions/51875646/apache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=4pP1262DeccXeHHSPqygoqcKZcNR_lN14CUo2YIHRg&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au>

The essence of it is that in an InOut route with autoAck=false on the main queue the messages on the temporary reply queue are not being acked even though they are being fetched as indicated by the prefetch setting and the amount of messages in an un-acked state on the temp queues.

If I set the autoAck=true then it works fine but in that case I'm risking losing messages and that's not what I want (we switched from SEDA to RabbitMQ for this reason among others).
If I set the exchange pattern to InOnly then it works fine as well as the route then doesn't need the temp queue for the reply. That unfortunately causes a race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint get the message at the same time, but email route needs to finish first before it can be archived):

rest(restEndpoint).post(postEndpoint)
                  .type(MyRequest.class)
                  .route()
                  .startupOrder(Integer.MAX_VALUE - 2)
                                  .process(this::process)
                                  .choice()
                                    .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint, outputArchiveEndpoint)

I'd also like to be sending the correct response status to the REST client calling this service and at the moment that doesn't quite work too well as the un-acked messages start blocking the route or the response will be sent before email route has finished.

I'm only new to Camel so I might be missing something very obvious here but the last few days of searching around and trying various config options haven't really got me anywhere further.


Thanks and Best Regards,
Valdis