You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/07 10:52:33 UTC

[camel] branch master updated: CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new f31679d  CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does.
f31679d is described below

commit f31679dc3b98f451806d6ff6b95b50b44e3fbbfe
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 7 12:52:05 2019 +0200

    CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does.
---
 .../component/sjms/batch/SjmsBatchConsumer.java    | 21 ++++++---
 .../sjms/batch/SjmsBatchConsumerTest.java          | 54 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 4659b0e..ec6168e 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -403,7 +403,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     if (timeout.compareAndSet(true, false) || timeoutInterval.compareAndSet(true, false)) {
                         // trigger timeout
                         LOG.trace("Completion batch due timeout");
-                        completionBatch(session);
+                        String completedBy = completionInterval > 0 ? "interval" : "timeout";
+                        completionBatch(session, completedBy);
                         reset();
                         continue;
                     }
@@ -411,7 +412,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     if (completionSize > 0 && messageCount >= completionSize) {
                         // trigger completion size
                         LOG.trace("Completion batch due size");
-                        completionBatch(session);
+                        completionBatch(session, "size");
                         reset();
                         continue;
                     }
@@ -454,7 +455,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                                     if (complete) {
                                         // trigger completion predicate
                                         LOG.trace("Completion batch due predicate");
-                                        completionBatch(session);
+                                        completionBatch(session, "predicate");
                                         reset();
                                     }
                                 } catch (Exception e) {
@@ -492,12 +493,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 aggregatedExchange = null;
             }
 
-            private void completionBatch(final Session session) {
+            private void completionBatch(final Session session, String completedBy) {
                 // batch
                 if (aggregatedExchange == null && getEndpoint().isSendEmptyMessageWhenIdle()) {
                     processEmptyMessage();
                 } else if (aggregatedExchange != null) {
-                    processBatch(aggregatedExchange, session);
+                    processBatch(aggregatedExchange, session, completedBy);
                 }
             }
 
@@ -546,7 +547,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         /**
          * Send an message with the batches messages.
          */
-        private void processBatch(Exchange exchange, Session session) {
+        private void processBatch(Exchange exchange, Session session, String completedBy) {
             int id = BATCH_COUNT.getAndIncrement();
             int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
             if (LOG.isDebugEnabled()) {
@@ -554,6 +555,14 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + total);
             }
 
+            if ("timeout".equals(completedBy)) {
+                aggregationStrategy.timeout(exchange, id, batchSize, completionTimeout);
+            }
+            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, completedBy);
+
+            // invoke the on completion callback
+            aggregationStrategy.onCompletion(exchange);
+
             SessionCompletion sessionCompletion = new SessionCompletion(session);
             exchange.addOnCompletion(sessionCompletion);
             try {
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 3b3d59d..ecf7d10 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -25,12 +25,15 @@ import javax.jms.ConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.builder.AggregationStrategies;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.SjmsComponent;
 import org.apache.camel.component.sjms.support.MockConnectionFactory;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
 import org.apache.camel.support.SimpleRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.camel.util.StopWatch;
@@ -153,6 +156,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
         template.sendBody("direct:in", generateStrings(messageCount));
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("size", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -180,6 +186,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         template.sendBody("direct:in", generateStrings(50));
         template.sendBody("direct:in", "Message done");
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("predicate", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -206,6 +215,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         template.sendBody("direct:in", generateStrings(messageCount));
         mockBatches.assertIsSatisfied();
         assertFirstMessageBodyOfLength(mockBatches, messageCount);
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("timeout", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -233,6 +245,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         template.sendBody("direct:in", generateStrings(messageCount));
 
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("interval", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -384,7 +399,46 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
         assertMockEndpointsSatisfied();
         stopWatch.taken();
+    }
+
+    @Test
+    public void testConsumptionCompletionAware() throws Exception {
+        final int completionSize = 5;
 
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                context.getRegistry().bind("groupedStrategy", AggregationStrategies.groupedBody());
+
+                fromF("sjms-batch:%s?completionSize=%s&aggregationStrategy=#groupedStrategy",
+                        queueName, completionSize).routeId("batchConsumer").startupOrder(10)
+                        .log(LoggingLevel.DEBUG, "${body.size}")
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(1);
+
+        template.sendBody("direct:in", "A,B,C,D,E");
+
+        mockBatches.assertIsSatisfied();
+        
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertNotNull(msg);
+
+        assertEquals("size", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+
+        List body = msg.getBody(List.class);
+        assertNotNull(body);
+        assertEquals(5, body.size());
+        assertEquals("A", body.get(0));
+        assertEquals("B", body.get(1));
+        assertEquals("C", body.get(2));
+        assertEquals("D", body.get(3));
+        assertEquals("E", body.get(4));
     }
 
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {