You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/16 03:59:37 UTC
[camel] branch camel-2.x updated: CAMEL-13955: camel-sjms batch now
supports aggregation strategy completion aware. Also add exchange property
with detail how it was completed like the Aggregate EIP does. (#3249)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push:
new 5be6548 CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does. (#3249)
5be6548 is described below
commit 5be6548b592c219af244f5dcce849a9e2ba002c2
Author: bradhgbst <54...@users.noreply.github.com>
AuthorDate: Wed Oct 16 13:59:27 2019 +1000
CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does. (#3249)
---
.../component/sjms/batch/SjmsBatchConsumer.java | 25 +++++++---
.../sjms/batch/SjmsBatchConsumerTest.java | 55 +++++++++++++++++++++-
2 files changed, 73 insertions(+), 7 deletions(-)
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 3e742f6..6ac149e 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -41,6 +41,8 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -400,7 +402,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (timeout.compareAndSet(true, false) || timeoutInterval.compareAndSet(true, false)) {
// trigger timeout
LOG.trace("Completion batch due timeout");
- completionBatch(session);
+ String completedBy = completionInterval > 0 ? "interval" : "timeout";
+ completionBatch(session, completedBy);
reset();
continue;
}
@@ -408,7 +411,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (completionSize > 0 && messageCount >= completionSize) {
// trigger completion size
LOG.trace("Completion batch due size");
- completionBatch(session);
+ completionBatch(session, "size");
reset();
continue;
}
@@ -451,7 +454,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (complete) {
// trigger completion predicate
LOG.trace("Completion batch due predicate");
- completionBatch(session);
+ completionBatch(session, "predicate");
reset();
}
} catch (Exception e) {
@@ -489,12 +492,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
aggregatedExchange = null;
}
- private void completionBatch(final Session session) {
+ private void completionBatch(final Session session, String completedBy) {
// batch
if (aggregatedExchange == null && getEndpoint().isSendEmptyMessageWhenIdle()) {
processEmptyMessage();
} else if (aggregatedExchange != null) {
- processBatch(aggregatedExchange, session);
+ processBatch(aggregatedExchange, session, completedBy);
}
}
@@ -543,7 +546,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
/**
* Send an message with the batches messages.
*/
- private void processBatch(Exchange exchange, Session session) {
+ private void processBatch(Exchange exchange, Session session, String completedBy) {
int id = BATCH_COUNT.getAndIncrement();
int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
if (LOG.isDebugEnabled()) {
@@ -551,6 +554,16 @@ public class SjmsBatchConsumer extends DefaultConsumer {
LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + total);
}
+ if ("timeout".equals(completedBy) && aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
+ ((TimeoutAwareAggregationStrategy)aggregationStrategy).timeout(exchange, id, batchSize, completionTimeout);
+ }
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, completedBy);
+
+ // invoke the on completion callback
+ if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
+ ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
+ }
+
SessionCompletion sessionCompletion = new SessionCompletion(session);
exchange.addOnCompletion(sessionCompletion);
try {
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 6afc288..a611be1 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -25,7 +25,9 @@ import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.util.toolbox.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sjms.SjmsComponent;
@@ -153,6 +155,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("size", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -180,6 +185,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
template.sendBody("direct:in", generateStrings(50));
template.sendBody("direct:in", "Message done");
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("predicate", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -206,6 +214,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
assertFirstMessageBodyOfLength(mockBatches, messageCount);
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("timeout", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -233,6 +244,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("interval", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -383,8 +397,47 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
context.startRoute("batchConsumer");
assertMockEndpointsSatisfied();
- stopWatch.stop();
+ stopWatch.taken();
+ }
+
+ @Test
+ public void testConsumptionCompletionAware() throws Exception {
+ final int completionSize = 5;
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ ((SimpleRegistry)context.getRegistry()).put("groupedStrategy", AggregationStrategies.groupedBody());
+
+ fromF("sjms-batch:%s?completionSize=%s&aggregationStrategy=#groupedStrategy",
+ queueName, completionSize).routeId("batchConsumer").startupOrder(10)
+ .log(LoggingLevel.DEBUG, "${body.size}")
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ mockBatches.expectedMessageCount(1);
+
+ template.sendBody("direct:in", "A,B,C,D,E");
+
+ mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertNotNull(msg);
+
+ assertEquals("size", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+
+ List body = msg.getBody(List.class);
+ assertNotNull(body);
+ assertEquals(5, body.size());
+ assertEquals("A", body.get(0));
+ assertEquals("B", body.get(1));
+ assertEquals("C", body.get(2));
+ assertEquals("D", body.get(3));
+ assertEquals("E", body.get(4));
}
private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {