You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/09/30 18:17:36 UTC
[camel] branch camel-3.4.x updated: CAMEL-15580: SJMS Batch
Consumer startup race condition (#4306)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.4.x by this push:
new 1679e96 CAMEL-15580: SJMS Batch Consumer startup race condition (#4306)
1679e96 is described below
commit 1679e965ee004a0031997bbbb725b9ce4f5036c3
Author: Viral Gohel <vg...@redhat.com>
AuthorDate: Sun Sep 27 13:28:12 2020 +0530
CAMEL-15580: SJMS Batch Consumer startup race condition (#4306)
---
.../component/sjms/batch/SjmsBatchConsumer.java | 2 +-
.../sjms/batch/SjmsBatchConsumerTest.java | 43 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 6452fc6..daed6d9 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -185,6 +185,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
// its success so prepare for exit
connection = localConnection;
+ running.set(true);
final List<AtomicBoolean> triggers = new ArrayList<>();
for (int i = 0; i < consumerCount; i++) {
@@ -204,7 +205,6 @@ public class SjmsBatchConsumer extends DefaultConsumer {
}
LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
- running.set(true);
return;
} catch (Throwable e) {
// we failed so close the local connection as we create a new on next attempt
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index a2df865..4037369 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.sjms.batch;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -440,6 +441,48 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
assertEquals("E", body.get(4));
}
+ @Test
+ public void testStartupRaceCondition() throws Exception {
+ final int routeCount = 10;
+ final int consumerCount = 1;
+
+ List<String> queues = new ArrayList<>();
+
+ String queueNamePrefix = getQueueName();
+
+ // setup routeCount routes, each reading from its own queue but all writing to the same mock endpoint
+ for (int i = 0; i < routeCount; i++) {
+ String queueName = queueNamePrefix + "_" + i;
+ queues.add(queueName);
+ String routeId = "batchConsumer_" + i;
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+
+ int completionTimeout = 1000;
+ int completionSize = 1;
+
+ fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy&keepAliveDelay=100&asyncStartListener=true",
+ queueName, completionTimeout, completionSize, consumerCount)
+ .routeId(routeId).autoStartup(true)
+ .split(body())
+ .to("mock:split");
+ }
+ });
+ }
+
+ context.start();
+
+ // expect to receive routeCount messages to the mock endpoint
+ MockEndpoint mockSplit = getMockEndpoint("mock:split");
+ mockSplit.setExpectedMessageCount(routeCount);
+
+ // send one message to all the queues
+ queues.forEach(queueName -> template.sendBody("sjms:queue:" + queueName, queueName));
+
+ assertMockEndpointsSatisfied();
+
+ }
+
private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
Exchange exchange = mockEndpoint.getExchanges().get(0);
assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());