You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Valdis Andersons <va...@vhi.ie> on 2019/11/05 15:59:19 UTC

camel-rabbitmq sheduled route acks all messages when shutting down

Hi All,

Not sure I understand the issue fully, but I have scheduled route with this config (it's throttled as well):

CronScheduledRoutePolicy emailSchedulingPolicy = new CronScheduledRoutePolicy();
emailSchedulingPolicy.setRouteStartTime(emailNotificationsStartSchedule);
emailSchedulingPolicy.setRouteStopTime(emailNotificationsStopSchedule);

Processor exceptionLoggingProcessor = (exchange) ->
        logger.error("EmailNotificationRoute Error handled in camel.", exchange.getProperty(Exchange.EXCEPTION_CAUGHT));

setErrorHandlerBuilder(deadLetterChannel(deadLetterQueue)
        .onExceptionOccurred(exceptionLoggingProcessor));

onException(Exception.class)
        .logExhaustedMessageHistory(true)
        .useOriginalMessage();

from(emailNotificationInputEndpoint)
        .routeId(EMAIL_NOTIFICATIONS).routePolicy(emailSchedulingPolicy).autoStartup(autoStartup)
        .filter(notificationFilterPredicate)
        .throttle(maxEmailNotificationsThroughput)
        .timePeriodMillis(timePeriodMillisForThrottle)
        .process(emailNotificationProcessor)

        .to(emailNotificationOutEndpoint)
        .end();

The schedule is like this:

    startSchedule: 0 0 10 ? * TUE-FRI,SUN *
    stopSchedule: 0 0 15 ? * TUE-FRI,SUN *

So at 3pm the thing did shut down as expected but there was an error thrown in the logs (only thing I can see going wrong in the logs):

2019-11-05 15:00:11,358 [Camel (camel-1) thread #163 - ShutdownTask] ERROR o.a.c.c.r.RabbitConsumer  - Thread Interrupted!

The consumer config is this:

rabbitmq://vm1/emailnotificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailnotificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=9&threadPoolSize=9&channelPoolMaxSize=9&prefetchCount=1&prefetchEnabled=true&requestTimeout=90000&addresses=vm1,vm2,vm3

It's a clustered environment with the vm3 being a federated node. When the route shut down this happened:

[cid:image001.png@01D593F1.8F4BD250]

All of the 2k+ messages got somehow ack'd even though I have autoAck=false and this doesn't happen on a normal shutdown of the application. Am I missing something here, anyone would be able to explain why that could be happening? I'm a bit at a loss here. I was expecting the remaining messages to be just left on the queue after the route shut down (as prefetch is set to be only 1).


Best Regards and Thanks,
Valdis








RE: camel-rabbitmq sheduled route acks all messages when shutting down

Posted by Valdis Andersons <va...@vhi.ie>.
And another update.

By doing it the brutal way I'm getting my header. But why?

. when(body().regex(".*deliveryType:BASIC}")).to(outputEndpoint).to(outputBasickEndpoint)//.to(notificationRouterEndpoint)

...

private void process(Exchange exchange) {
    logger.info("RabbitMQ Persistence header after archiving: " + exchange.getIn().getHeader("rabbitmq.DELIVERY_MODE"));
..
           exchange.getIn().setHeader("rabbitmq.DELIVERY_MODE", 2);
           producerTemplate.send(notificationRouterEndpoint, exchange); //this is not even a hack anymore

}


I'm a bit confused as to why the choice/when expressions would be stripping out the rabbitmq headers. Or is it the chain of the three .to expressions, but I've .to all over the place in the application and the headers seem ok there. Though this is the only place with three chained together like that. I'm not finding anything suspicious about that in the documentation either. Please tell me it's something simple I'm missing here! I really would not like to resort to the above "solution".


Camel version is 2.21.3


Thanks,
Valdis

P.S. Tried it with the header expression instead of the body expression and it still wouldn't populate the header.

From: Valdis Andersons
Sent: 05 November 2019 21:54
To: users@camel.apache.org
Subject: RE: camel-rabbitmq sheduled route acks all messages when shutting down


CAUTION: This email originated from outside of VHI. Do not click links or open attachments unless you know the content is safe.



Hi All,

Did a bit of digging and the issue appears to be coming from upstream on the producer. Here is the route configuration for it:

from("direct:archive")
.log("Receiving ${body}")
.aggregate(header("aggregationkey"))
.aggregationStrategyRef(aggregationStrategy)
.aggregationRepositoryRef(repoName)
.completionSize(batchSize)
.completionTimeout(batchTimeout)
.log("Sending out ${body}")
.to(archiveProcessor)
      .split(body())
      .log("processing Split: ${body}")
      .process(this::process)
      .choice()
      .when(body().regex(".*deliveryType:BASIC}")).setHeader("rabbitmq.DELIVERY_MODE", simple("2")).to(outputEndpoint).to(outputBasickEndpoint).to(notificationRouterEndpoint)
      .when(body().regex(".*deliveryType:PRIORITY}")).to(outputEndpoint).to(outputPriorityEndpoint).to(priorityNotificationRouterEndpoint)
      .endChoice()
.end();

Issue stems from the "notificationRouterEndpoint" receiving a message that has had the DELIVERY_MODE header stripped out somewhere along the way. The other headers I'm setting on the exchange seem to be still there when I get the message directly from the RabbitMQ console but the Delivery Mode property seems to be gone. The above is my last attempt at forcing it to stick to the message but to avail.
Oddly enough the header seems to be there after the split and before the choice/when block:

private void process(Exchange exchange) {
    logger.info("RabbitMQ Persistence header after archiving: " + exchange.getIn().getHeader("rabbitmq.DELIVERY_MODE"));
}

Gives me this in the logs:

[2019-11-05 21:33:30,822 Camel (camel-1) thread #2 - AggregateTimeoutChecker] INFO  ie.vhi.cch.builders.BatchArchiveFeedbackRouteBuilder - RabbitMQ Persistence header after archiving: 2

By the time it gets to the rabbit queue the value is null. This is the rabbit queue config for notificationRouterEndpoint:

notificationRouterEndpoint: rabbitmq://vm1/notificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=notificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&exchangePattern=InOnly&channelPoolMaxSize=3
&addresses=vm1,vm2,vm3

The idea here is to send out notifications that something has been archived (batching the archive items helps with the load on the Archive Service). The splitting is there to facilitate individual notification processing for each archived item (since they can go different ways and also fail in all sorts of different ways).

Any help/advice/hint on the above would be greatly appreciated.


Thanks and Regards,
Valdis

From: Valdis Andersons
Sent: 05 November 2019 15:59
To: users@camel.apache.org<ma...@camel.apache.org>
Subject: camel-rabbitmq sheduled route acks all messages when shutting down

Hi All,

Not sure I understand the issue fully, but I have scheduled route with this config (it's throttled as well):

CronScheduledRoutePolicy emailSchedulingPolicy = new CronScheduledRoutePolicy();
emailSchedulingPolicy.setRouteStartTime(emailNotificationsStartSchedule);
emailSchedulingPolicy.setRouteStopTime(emailNotificationsStopSchedule);

Processor exceptionLoggingProcessor = (exchange) ->
        logger.error("EmailNotificationRoute Error handled in camel.", exchange.getProperty(Exchange.EXCEPTION_CAUGHT));

setErrorHandlerBuilder(deadLetterChannel(deadLetterQueue)
        .onExceptionOccurred(exceptionLoggingProcessor));

onException(Exception.class)
        .logExhaustedMessageHistory(true)
        .useOriginalMessage();

from(emailNotificationInputEndpoint)
        .routeId(EMAIL_NOTIFICATIONS).routePolicy(emailSchedulingPolicy).autoStartup(autoStartup)
        .filter(notificationFilterPredicate)
        .throttle(maxEmailNotificationsThroughput)
        .timePeriodMillis(timePeriodMillisForThrottle)
        .process(emailNotificationProcessor)

        .to(emailNotificationOutEndpoint)
        .end();

The schedule is like this:

    startSchedule: 0 0 10 ? * TUE-FRI,SUN *
    stopSchedule: 0 0 15 ? * TUE-FRI,SUN *

So at 3pm the thing did shut down as expected but there was an error thrown in the logs (only thing I can see going wrong in the logs):

2019-11-05 15:00:11,358 [Camel (camel-1) thread #163 - ShutdownTask] ERROR o.a.c.c.r.RabbitConsumer  - Thread Interrupted!

The consumer config is this:

rabbitmq://vm1/emailnotificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailnotificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=9&threadPoolSize=9&channelPoolMaxSize=9&prefetchCount=1&prefetchEnabled=true&requestTimeout=90000&addresses=vm1,vm2,vm3

It's a clustered environment with the vm3 being a federated node. When the route shut down this happened:

[cid:image001.png@01D59421.B32A1990]

All of the 2k+ messages got somehow ack'd even though I have autoAck=false and this doesn't happen on a normal shutdown of the application. Am I missing something here, anyone would be able to explain why that could be happening? I'm a bit at a loss here. I was expecting the remaining messages to be just left on the queue after the route shut down (as prefetch is set to be only 1).


Best Regards and Thanks,
Valdis








RE: camel-rabbitmq sheduled route acks all messages when shutting down

Posted by Valdis Andersons <va...@vhi.ie>.
Hi All,

Did a bit of digging and the issue appears to be coming from upstream on the producer. Here is the route configuration for it:

from("direct:archive")
.log("Receiving ${body}")
.aggregate(header("aggregationkey"))
.aggregationStrategyRef(aggregationStrategy)
.aggregationRepositoryRef(repoName)
.completionSize(batchSize)
.completionTimeout(batchTimeout)
.log("Sending out ${body}")
.to(archiveProcessor)
      .split(body())
      .log("processing Split: ${body}")
      .process(this::process)
      .choice()
      .when(body().regex(".*deliveryType:BASIC}")).setHeader("rabbitmq.DELIVERY_MODE", simple("2")).to(outputEndpoint).to(outputBasickEndpoint).to(notificationRouterEndpoint)
      .when(body().regex(".*deliveryType:PRIORITY}")).to(outputEndpoint).to(outputPriorityEndpoint).to(priorityNotificationRouterEndpoint)
      .endChoice()
.end();

Issue stems from the "notificationRouterEndpoint" receiving a message that has had the DELIVERY_MODE header stripped out somewhere along the way. The other headers I'm setting on the exchange seem to be still there when I get the message directly from the RabbitMQ console but the Delivery Mode property seems to be gone. The above is my last attempt at forcing it to stick to the message but to avail.
Oddly enough the header seems to be there after the split and before the choice/when block:

private void process(Exchange exchange) {
    logger.info("RabbitMQ Persistence header after archiving: " + exchange.getIn().getHeader("rabbitmq.DELIVERY_MODE"));
}

Gives me this in the logs:

[2019-11-05 21:33:30,822 Camel (camel-1) thread #2 - AggregateTimeoutChecker] INFO  ie.vhi.cch.builders.BatchArchiveFeedbackRouteBuilder - RabbitMQ Persistence header after archiving: 2

By the time it gets to the rabbit queue the value is null. This is the rabbit queue config for notificationRouterEndpoint:

notificationRouterEndpoint: rabbitmq://vm1/notificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=notificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&exchangePattern=InOnly&channelPoolMaxSize=3
&addresses=vm1,vm2,vm3

The idea here is to send out notifications that something has been archived (batching the archive items helps with the load on the Archive Service). The splitting is there to facilitate individual notification processing for each archived item (since they can go different ways and also fail in all sorts of different ways).

Any help/advice/hint on the above would be greatly appreciated.


Thanks and Regards,
Valdis

From: Valdis Andersons
Sent: 05 November 2019 15:59
To: users@camel.apache.org
Subject: camel-rabbitmq sheduled route acks all messages when shutting down

Hi All,

Not sure I understand the issue fully, but I have scheduled route with this config (it's throttled as well):

CronScheduledRoutePolicy emailSchedulingPolicy = new CronScheduledRoutePolicy();
emailSchedulingPolicy.setRouteStartTime(emailNotificationsStartSchedule);
emailSchedulingPolicy.setRouteStopTime(emailNotificationsStopSchedule);

Processor exceptionLoggingProcessor = (exchange) ->
        logger.error("EmailNotificationRoute Error handled in camel.", exchange.getProperty(Exchange.EXCEPTION_CAUGHT));

setErrorHandlerBuilder(deadLetterChannel(deadLetterQueue)
        .onExceptionOccurred(exceptionLoggingProcessor));

onException(Exception.class)
        .logExhaustedMessageHistory(true)
        .useOriginalMessage();

from(emailNotificationInputEndpoint)
        .routeId(EMAIL_NOTIFICATIONS).routePolicy(emailSchedulingPolicy).autoStartup(autoStartup)
        .filter(notificationFilterPredicate)
        .throttle(maxEmailNotificationsThroughput)
        .timePeriodMillis(timePeriodMillisForThrottle)
        .process(emailNotificationProcessor)

        .to(emailNotificationOutEndpoint)
        .end();

The schedule is like this:

    startSchedule: 0 0 10 ? * TUE-FRI,SUN *
    stopSchedule: 0 0 15 ? * TUE-FRI,SUN *

So at 3pm the thing did shut down as expected but there was an error thrown in the logs (only thing I can see going wrong in the logs):

2019-11-05 15:00:11,358 [Camel (camel-1) thread #163 - ShutdownTask] ERROR o.a.c.c.r.RabbitConsumer  - Thread Interrupted!

The consumer config is this:

rabbitmq://vm1/emailnotificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailnotificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=9&threadPoolSize=9&channelPoolMaxSize=9&prefetchCount=1&prefetchEnabled=true&requestTimeout=90000&addresses=vm1,vm2,vm3

It's a clustered environment with the vm3 being a federated node. When the route shut down this happened:

[cid:image001.png@01D59421.B32A1990]

All of the 2k+ messages got somehow ack'd even though I have autoAck=false and this doesn't happen on a normal shutdown of the application. Am I missing something here, anyone would be able to explain why that could be happening? I'm a bit at a loss here. I was expecting the remaining messages to be just left on the queue after the route shut down (as prefetch is set to be only 1).


Best Regards and Thanks,
Valdis