You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Simon Rasmussen (Jira)" <ji...@apache.org> on 2022/12/06 08:31:00 UTC

[jira] [Commented] (CAMEL-18780) Sqs2Consumer message extended causing rejected execution exception when used with threads EIP

    [ https://issues.apache.org/jira/browse/CAMEL-18780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643719#comment-17643719 ] 

Simon Rasmussen commented on CAMEL-18780:
-----------------------------------------

We ended up adjusting the maxQueueSize of the route pool to something lower than 1000.

Having a 1000 messages, potentially all having to be extended via a single thread wasn't the best idea in the first place. We now have the consumers set with a max queue size of 100-500 instead. This leaves plenty of tasks available for the extender to not end up blocking...

I don't mind contributing an actual PR for fixing the problem - but I don't know what the proper solution would be. At least this ticket will serve as a reference for others who end up facing this issue.

> Sqs2Consumer message extended causing rejected execution exception when used with threads EIP
> ---------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-18780
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18780
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-aws2
>    Affects Versions: 3.19.0
>            Reporter: Simon Rasmussen
>            Priority: Major
>             Fix For: 3.18.5, 3.20.0
>
>
> The message extension feature of the Sqs2Consumer can cause rejected execution exceptions such as:
> {noformat}
> 2022-11-30 16:43:51.958 logLevel=WARN 10 --- [xxx] logger=o.a.c.component.aws2.sqs.Sqs2Consumer    : Failed polling endpoint: aws2-sqs://arn:aws:sqs:eu-west-1:xxxxxxx:some_queue?delay=3000&extendMessageVisibility=true&greedy=true&visibilityTimeout=60&waitTimeSeconds=10. Will try again at next poll. Caused by: [java.util.concurrent.RejectedExecutionException - Task rejected due queue size limit reached]
> java.util.concurrent.RejectedExecutionException: Task rejected due queue size limit reached
> 	at org.apache.camel.util.concurrent.SizedScheduledExecutorService.scheduleAtFixedRate(SizedScheduledExecutorService.java:92) ~[camel-util-3.18.2.jar:3.18.2]
> 	at org.apache.camel.component.aws2.sqs.Sqs2Consumer.processBatch(Sqs2Consumer.java:183) ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
> 	at org.apache.camel.component.aws2.sqs.Sqs2Consumer.poll(Sqs2Consumer.java:121) ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
> 	at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.18.2.jar:3.18.2]
> 	at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.18.2.jar:3.18.2]
> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
> 	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
> 	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
> {noformat}
> The consumer is configured with a default ThreadPoolPofile, and thus has a maxQueueSize of 1000.
> The message extender is running in its own scheduled executor which is instantiated within Sqs2Consumer:
> {code:java}
> this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager()
>                     .newSingleThreadScheduledExecutor(this, "SqsTimeoutExtender");
> {code}
> Thus, also using the default thread pool profile, and thus a maxQueueSize of 1000.
> A slowdown of processing the extending tasks can lead to this inner queue being filled, causing the exceptions to be thrown (quickly flooding the logs).
> Possible solutions that I can think of would be to set the maxQueueSize of the SqsTimeoutExtender to 2x of the consumer thread pool or set the maxQueueSize to unbound (-1). 
> The latter might be acceptable tasks are cancelled upon completing and thus cannot grow unbound.
> I can contribute a PR, but would need some guidance as to which solution our be preferable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)