You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Brad Harvey (Jira)" <ji...@apache.org> on 2020/09/25 15:14:00 UTC

[jira] [Commented] (CAMEL-15580) SJMS Batch Consumer startup race condition

    [ https://issues.apache.org/jira/browse/CAMEL-15580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17202234#comment-17202234 ] 

Brad Harvey commented on CAMEL-15580:
-------------------------------------

I added this to SjmsBatchConsumerTest to try to reproduce.  I didn't have any luck reproducing on my laptop until I added the Thread.sleep mentioned in the description - a sleep of 10 ms causes it to fail about half the time, higher sleeps fail more often.
{code:java}
@Test
public void testStartupRaceCondition() throws Exception {
    final int routeCount = 10;
    final int consumerCount = 1;

    List<String> queues = new ArrayList<>();

    String queueNamePrefix = getQueueName();

    // setup routeCount routes, each reading from its own queue but all writing to the same mock endpoint
    for (int i = 0; i < routeCount; i++) {
        String queueName = queueNamePrefix + "_" + i;
        queues.add(queueName);
        String routeId = "batchConsumer_" + i;
        context.addRoutes(new RouteBuilder() {
            public void configure() throws Exception {

                int completionTimeout = 1000;
                int completionSize = 1;

                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy&keepAliveDelay=100&asyncStartListener=true",
                        queueName, completionTimeout, completionSize, consumerCount)
                        .routeId(routeId).autoStartup(true)
                        .split(body())
                        .to("mock:split");
            }
        });
    }

    context.start();

    // expect to receive routeCount messages to the mock endpoint
    MockEndpoint mockSplit = getMockEndpoint("mock:split");
    mockSplit.setExpectedMessageCount(routeCount);

    // send one message to all the queues
    queues.forEach(queueName -> template.sendBody("sjms:queue:" + queueName, queueName));

    assertMockEndpointsSatisfied();

} {code}

> SJMS Batch Consumer startup race condition
> ------------------------------------------
>
>                 Key: CAMEL-15580
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15580
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sjms
>    Affects Versions: 3.4.3
>            Reporter: Brad Harvey
>            Priority: Major
>
> There is a race condition between the SJMS Batch Consumer route start thread and the batch consumption loop thread.  When it triggers the batch consumption loop exits early and the SJMS Batch Consumer does not read any JMS messages.
> In short:
>  * The AtomicBoolean running is used as a flag to shut down the batch consumption loop
>  * The batch consumption loop is submitted to another thread and only after that running is changed to true
>  * This means sometimes the batch consumption loop sees running as false during startup
> The easiest way to reproduce it is to add a sleep into SJMSBatchConsumer$StartConsumerTask#run
>  
> {code:java}
> final List<AtomicBoolean> triggers = new ArrayList<>();
> for (int i = 0; i < consumerCount; i++) {
>     BatchConsumptionLoop loop = new BatchConsumptionLoop();
>     loop.setKeepAliveDelay(keepAliveDelay);
>     triggers.add(loop.getCompletionTimeoutTrigger());
>     /*
>      * Note: Batch consumption loop is submitted to another thread here
>      */
>     jmsConsumerExecutors.submit(loop);
> }
> if (completionInterval > 0) {
>     // trigger completion based on interval
>     timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers), completionInterval, completionInterval, TimeUnit.MILLISECONDS);
> }
> if (attempt > 1) {
>     LOG.info("Successfully refreshed connection after {} attempts.", attempt);
> }
> /*
>  * Note: Add this sleep to reproduce the race condition, simulating
>  * this thread being pre-empted by other work
>  */
> Thread.sleep(100);  
> LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
> /*
>  * Note: running is only changed to true here but the batch consumption loop
>  * that reads this values was submitted to another thread earlier
>  */
> running.set(true);
> return;
>  {code}
>  
> The batch consumption loop checks the running flag like this:
> {code:java}
>             private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer) throws JMSException {
>                 final boolean usingTimeout = completionTimeout > 0;
>                 LOG.trace("BatchConsumptionTask +++ start +++");
>                 while (running.get()) { {code}
>  
> Usually there's a second check that would cause everything to loop again - it may see running as false but see isStarting() as true.
> {code:java}
>                 }while (running.get() || isStarting()); {code}
> But with asyncStartListener enabled I think that isStarting() is likely to be false as well.
>  
> I believe this issue is causing fairly frequent intermittent test failures in our CI environment (jenkins slaves in kubernetes, linux).  But I've been unable to reproduce it on my laptop (windows) without adding the artificial delay on the main thread.  
> I've been able to get thread dumps from the CI environment showing the executor waiting for a task instead of executing the batch consumption loop
> {code:java}
> "Camel (camel-8) thread #125 - SjmsBatchConsumer" 
>    java.lang.Thread.State: WAITING
>         at sun.misc.Unsafe.park(Native Method)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>         at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>  {code}
>  
> Usually they should look like this:
> {code:java}
> "Camel (camel-8) thread #123 - SjmsBatchConsumer" 
>    java.lang.Thread.State: TIMED_WAITING
>         at java.lang.Object.wait(Native Method)
>         at org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74)
>         at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486)
>         at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653)
>         at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:429)
>         at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.access$1300(SjmsBatchConsumer.java:383)
>         at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:326)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748) {code}
>  
> I also get tracing logs where the batch consumption tasks starts & ends very quickly.
> {code:java}
> 	Line 4377: 2020-09-24 03:16:41.567 DEBUG||| 4604 --- [artStopListener] o.a.c.c.sjms.batch.SjmsBatchConsumer     : Attempt #1. Starting 1 consumer(s) for myqueue:300
> 	Line 4415: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask +++ start +++
> 	Line 4416: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask +++ end +++
> 	Line 4435: 2020-09-24 03:16:41.568 INFO ||| 4604 --- [artStopListener] o.a.c.c.sjms.batch.SjmsBatchConsumer     : Started 1 consumer(s) for myqueue:300 {code}
>  
> Side note: Could the queue name be added to the thread name?  The JMS component consumers do that.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)