You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/07 10:52:33 UTC
[camel] branch master updated: CAMEL-13955: camel-sjms batch now
supports aggregation strategy completion aware. Also add exchange property
with detail how it was completed like the Aggregate EIP does.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new f31679d CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does.
f31679d is described below
commit f31679dc3b98f451806d6ff6b95b50b44e3fbbfe
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 7 12:52:05 2019 +0200
CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does.
---
.../component/sjms/batch/SjmsBatchConsumer.java | 21 ++++++---
.../sjms/batch/SjmsBatchConsumerTest.java | 54 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 6 deletions(-)
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 4659b0e..ec6168e 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -403,7 +403,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (timeout.compareAndSet(true, false) || timeoutInterval.compareAndSet(true, false)) {
// trigger timeout
LOG.trace("Completion batch due timeout");
- completionBatch(session);
+ String completedBy = completionInterval > 0 ? "interval" : "timeout";
+ completionBatch(session, completedBy);
reset();
continue;
}
@@ -411,7 +412,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (completionSize > 0 && messageCount >= completionSize) {
// trigger completion size
LOG.trace("Completion batch due size");
- completionBatch(session);
+ completionBatch(session, "size");
reset();
continue;
}
@@ -454,7 +455,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (complete) {
// trigger completion predicate
LOG.trace("Completion batch due predicate");
- completionBatch(session);
+ completionBatch(session, "predicate");
reset();
}
} catch (Exception e) {
@@ -492,12 +493,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
aggregatedExchange = null;
}
- private void completionBatch(final Session session) {
+ private void completionBatch(final Session session, String completedBy) {
// batch
if (aggregatedExchange == null && getEndpoint().isSendEmptyMessageWhenIdle()) {
processEmptyMessage();
} else if (aggregatedExchange != null) {
- processBatch(aggregatedExchange, session);
+ processBatch(aggregatedExchange, session, completedBy);
}
}
@@ -546,7 +547,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
/**
* Send an message with the batches messages.
*/
- private void processBatch(Exchange exchange, Session session) {
+ private void processBatch(Exchange exchange, Session session, String completedBy) {
int id = BATCH_COUNT.getAndIncrement();
int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
if (LOG.isDebugEnabled()) {
@@ -554,6 +555,14 @@ public class SjmsBatchConsumer extends DefaultConsumer {
LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + total);
}
+ if ("timeout".equals(completedBy)) {
+ aggregationStrategy.timeout(exchange, id, batchSize, completionTimeout);
+ }
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, completedBy);
+
+ // invoke the on completion callback
+ aggregationStrategy.onCompletion(exchange);
+
SessionCompletion sessionCompletion = new SessionCompletion(session);
exchange.addOnCompletion(sessionCompletion);
try {
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 3b3d59d..ecf7d10 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -25,12 +25,15 @@ import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.builder.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sjms.SjmsComponent;
import org.apache.camel.component.sjms.support.MockConnectionFactory;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
import org.apache.camel.support.SimpleRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.StopWatch;
@@ -153,6 +156,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("size", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -180,6 +186,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
template.sendBody("direct:in", generateStrings(50));
template.sendBody("direct:in", "Message done");
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("predicate", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -206,6 +215,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
assertFirstMessageBodyOfLength(mockBatches, messageCount);
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("timeout", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -233,6 +245,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("interval", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -384,7 +399,46 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
assertMockEndpointsSatisfied();
stopWatch.taken();
+ }
+
+ @Test
+ public void testConsumptionCompletionAware() throws Exception {
+ final int completionSize = 5;
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ context.getRegistry().bind("groupedStrategy", AggregationStrategies.groupedBody());
+
+ fromF("sjms-batch:%s?completionSize=%s&aggregationStrategy=#groupedStrategy",
+ queueName, completionSize).routeId("batchConsumer").startupOrder(10)
+ .log(LoggingLevel.DEBUG, "${body.size}")
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ mockBatches.expectedMessageCount(1);
+
+ template.sendBody("direct:in", "A,B,C,D,E");
+
+ mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertNotNull(msg);
+
+ assertEquals("size", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+
+ List body = msg.getBody(List.class);
+ assertNotNull(body);
+ assertEquals(5, body.size());
+ assertEquals("A", body.get(0));
+ assertEquals("B", body.get(1));
+ assertEquals("C", body.get(2));
+ assertEquals("D", body.get(3));
+ assertEquals("E", body.get(4));
}
private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {