You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/09/27 07:58:33 UTC

[camel] branch master updated: CAMEL-15580: SJMS Batch Consumer startup race condition (#4306)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new fc84b92  CAMEL-15580: SJMS Batch Consumer startup race condition (#4306)
fc84b92 is described below

commit fc84b925a948a594335241a984ae2565c72f1c7f
Author: Viral Gohel <vg...@redhat.com>
AuthorDate: Sun Sep 27 13:28:12 2020 +0530

    CAMEL-15580: SJMS Batch Consumer startup race condition (#4306)
---
 .../component/sjms/batch/SjmsBatchConsumer.java    |  2 +-
 .../sjms/batch/SjmsBatchConsumerTest.java          | 43 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 7809512..e72233d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -191,6 +191,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
                     // its success so prepare for exit
                     connection = localConnection;
+                    running.set(true);
 
                     final List<AtomicBoolean> triggers = new ArrayList<>();
                     for (int i = 0; i < consumerCount; i++) {
@@ -211,7 +212,6 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     }
 
                     LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
-                    running.set(true);
                     return;
                 } catch (Throwable e) {
                     // we failed so close the local connection as we create a new on next attempt
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 3861e46..9ebe6b2 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.sjms.batch;
 
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -444,6 +445,48 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         assertEquals("E", body.get(4));
     }
 
+    @Test
+    public void testStartupRaceCondition() throws Exception {
+        final int routeCount = 10;
+        final int consumerCount = 1;
+
+        List<String> queues = new ArrayList<>();
+
+        String queueNamePrefix = getQueueName();
+
+        // setup routeCount routes, each reading from its own queue but all writing to the same mock endpoint
+        for (int i = 0; i < routeCount; i++) {
+            String queueName = queueNamePrefix + "_" + i;
+            queues.add(queueName);
+            String routeId = "batchConsumer_" + i;
+            context.addRoutes(new RouteBuilder() {
+                public void configure() throws Exception {
+
+                    int completionTimeout = 1000;
+                    int completionSize = 1;
+
+                    fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy&keepAliveDelay=100&asyncStartListener=true",
+                            queueName, completionTimeout, completionSize, consumerCount)
+                                    .routeId(routeId).autoStartup(true)
+                                    .split(body())
+                                    .to("mock:split");
+                }
+            });
+        }
+
+        context.start();
+
+        // expect to receive routeCount messages to the mock endpoint
+        MockEndpoint mockSplit = getMockEndpoint("mock:split");
+        mockSplit.setExpectedMessageCount(routeCount);
+
+        // send one message to all the queues
+        queues.forEach(queueName -> template.sendBody("sjms:queue:" + queueName, queueName));
+
+        assertMockEndpointsSatisfied();
+
+    }
+
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
         Exchange exchange = mockEndpoint.getExchanges().get(0);
         assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());