You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (Jira)" <ji...@apache.org> on 2023/01/15 12:58:00 UTC

[jira] [Assigned] (CAMEL-18780) Sqs2Consumer message extended causing rejected execution exception when used with threads EIP

     [ https://issues.apache.org/jira/browse/CAMEL-18780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen reassigned CAMEL-18780:
-----------------------------------

    Assignee: Simon Rasmussen

> Sqs2Consumer message extended causing rejected execution exception when used with threads EIP
> ---------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-18780
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18780
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-aws2
>    Affects Versions: 3.19.0
>            Reporter: Simon Rasmussen
>            Assignee: Simon Rasmussen
>            Priority: Major
>             Fix For: 3.20.2, 3.21.0
>
>
> The message extension feature of the Sqs2Consumer can cause rejected execution exceptions such as:
> {noformat}
> 2022-11-30 16:43:51.958 logLevel=WARN 10 --- [xxx] logger=o.a.c.component.aws2.sqs.Sqs2Consumer    : Failed polling endpoint: aws2-sqs://arn:aws:sqs:eu-west-1:xxxxxxx:some_queue?delay=3000&extendMessageVisibility=true&greedy=true&visibilityTimeout=60&waitTimeSeconds=10. Will try again at next poll. Caused by: [java.util.concurrent.RejectedExecutionException - Task rejected due queue size limit reached]
> java.util.concurrent.RejectedExecutionException: Task rejected due queue size limit reached
> 	at org.apache.camel.util.concurrent.SizedScheduledExecutorService.scheduleAtFixedRate(SizedScheduledExecutorService.java:92) ~[camel-util-3.18.2.jar:3.18.2]
> 	at org.apache.camel.component.aws2.sqs.Sqs2Consumer.processBatch(Sqs2Consumer.java:183) ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
> 	at org.apache.camel.component.aws2.sqs.Sqs2Consumer.poll(Sqs2Consumer.java:121) ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
> 	at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.18.2.jar:3.18.2]
> 	at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.18.2.jar:3.18.2]
> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
> 	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
> 	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
> {noformat}
> The consumer is configured with a default ThreadPoolPofile, and thus has a maxQueueSize of 1000.
> The message extender is running in its own scheduled executor which is instantiated within Sqs2Consumer:
> {code:java}
> this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager()
>                     .newSingleThreadScheduledExecutor(this, "SqsTimeoutExtender");
> {code}
> Thus, also using the default thread pool profile, and thus a maxQueueSize of 1000.
> A slowdown of processing the extending tasks can lead to this inner queue being filled, causing the exceptions to be thrown (quickly flooding the logs).
> Possible solutions that I can think of would be to set the maxQueueSize of the SqsTimeoutExtender to 2x of the consumer thread pool or set the maxQueueSize to unbound (-1). 
> The latter might be acceptable tasks are cancelled upon completing and thus cannot grow unbound.
> I can contribute a PR, but would need some guidance as to which solution our be preferable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)