You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Brad Harvey (Jira)" <ji...@apache.org> on 2020/09/25 15:22:00 UTC

[jira] [Updated] (CAMEL-15580) SJMS Batch Consumer startup race condition

     [ https://issues.apache.org/jira/browse/CAMEL-15580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Brad Harvey updated CAMEL-15580:
--------------------------------
    Attachment: potentialPatch.txt

> SJMS Batch Consumer startup race condition
> ------------------------------------------
>
>                 Key: CAMEL-15580
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15580
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sjms
>    Affects Versions: 3.4.3
>            Reporter: Brad Harvey
>            Priority: Major
>         Attachments: potentialPatch.txt
>
>
> There is a race condition between the SJMS Batch Consumer route start thread and the batch consumption loop thread.  When it triggers the batch consumption loop exits early and the SJMS Batch Consumer does not read any JMS messages.
> In short:
>  * The AtomicBoolean running is used as a flag to shut down the batch consumption loop
>  * The batch consumption loop is submitted to another thread and only after that running is changed to true
>  * This means sometimes the batch consumption loop sees running as false during startup
> The easiest way to reproduce it is to add a sleep into SJMSBatchConsumer$StartConsumerTask#run
>  
> {code:java}
> final List<AtomicBoolean> triggers = new ArrayList<>();
> for (int i = 0; i < consumerCount; i++) {
>     BatchConsumptionLoop loop = new BatchConsumptionLoop();
>     loop.setKeepAliveDelay(keepAliveDelay);
>     triggers.add(loop.getCompletionTimeoutTrigger());
>     /*
>      * Note: Batch consumption loop is submitted to another thread here
>      */
>     jmsConsumerExecutors.submit(loop);
> }
> if (completionInterval > 0) {
>     // trigger completion based on interval
>     timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers), completionInterval, completionInterval, TimeUnit.MILLISECONDS);
> }
> if (attempt > 1) {
>     LOG.info("Successfully refreshed connection after {} attempts.", attempt);
> }
> /*
>  * Note: Add this sleep to reproduce the race condition, simulating
>  * this thread being pre-empted by other work
>  */
> Thread.sleep(100);  
> LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
> /*
>  * Note: running is only changed to true here but the batch consumption loop
>  * that reads this values was submitted to another thread earlier
>  */
> running.set(true);
> return;
>  {code}
>  
> The batch consumption loop checks the running flag like this:
> {code:java}
>             private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer) throws JMSException {
>                 final boolean usingTimeout = completionTimeout > 0;
>                 LOG.trace("BatchConsumptionTask +++ start +++");
>                 while (running.get()) { {code}
>  
> Usually there's a second check that would cause everything to loop again - it may see running as false but see isStarting() as true.
> {code:java}
>                 }while (running.get() || isStarting()); {code}
> But with asyncStartListener enabled I think that isStarting() is likely to be false as well.
>  
> I believe this issue is causing fairly frequent intermittent test failures in our CI environment (jenkins slaves in kubernetes, linux).  But I've been unable to reproduce it on my laptop (windows) without adding the artificial delay on the main thread.  
> I've been able to get thread dumps from the CI environment showing the executor waiting for a task instead of executing the batch consumption loop
> {code:java}
> "Camel (camel-8) thread #125 - SjmsBatchConsumer" 
>    java.lang.Thread.State: WAITING
>         at sun.misc.Unsafe.park(Native Method)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>         at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>  {code}
>  
> Usually they should look like this:
> {code:java}
> "Camel (camel-8) thread #123 - SjmsBatchConsumer" 
>    java.lang.Thread.State: TIMED_WAITING
>         at java.lang.Object.wait(Native Method)
>         at org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74)
>         at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486)
>         at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653)
>         at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:429)
>         at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.access$1300(SjmsBatchConsumer.java:383)
>         at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:326)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748) {code}
>  
> I also get tracing logs where the batch consumption tasks starts & ends very quickly.
> {code:java}
> 	Line 4377: 2020-09-24 03:16:41.567 DEBUG||| 4604 --- [artStopListener] o.a.c.c.sjms.batch.SjmsBatchConsumer     : Attempt #1. Starting 1 consumer(s) for myqueue:300
> 	Line 4415: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask +++ start +++
> 	Line 4416: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask +++ end +++
> 	Line 4435: 2020-09-24 03:16:41.568 INFO ||| 4604 --- [artStopListener] o.a.c.c.sjms.batch.SjmsBatchConsumer     : Started 1 consumer(s) for myqueue:300 {code}
>  
> Side note: Could the queue name be added to the thread name?  The JMS component consumers do that.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)