You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/16 03:59:37 UTC

[camel] branch camel-2.x updated: CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does. (#3249)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new 5be6548  CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does. (#3249)
5be6548 is described below

commit 5be6548b592c219af244f5dcce849a9e2ba002c2
Author: bradhgbst <54...@users.noreply.github.com>
AuthorDate: Wed Oct 16 13:59:27 2019 +1000

    CAMEL-13955: camel-sjms batch now supports aggregation strategy completion aware. Also add exchange property with detail how it was completed like the Aggregate EIP does. (#3249)
---
 .../component/sjms/batch/SjmsBatchConsumer.java    | 25 +++++++---
 .../sjms/batch/SjmsBatchConsumerTest.java          | 55 +++++++++++++++++++++-
 2 files changed, 73 insertions(+), 7 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 3e742f6..6ac149e 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -41,6 +41,8 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -400,7 +402,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     if (timeout.compareAndSet(true, false) || timeoutInterval.compareAndSet(true, false)) {
                         // trigger timeout
                         LOG.trace("Completion batch due timeout");
-                        completionBatch(session);
+                        String completedBy = completionInterval > 0 ? "interval" : "timeout";
+                        completionBatch(session, completedBy);
                         reset();
                         continue;
                     }
@@ -408,7 +411,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     if (completionSize > 0 && messageCount >= completionSize) {
                         // trigger completion size
                         LOG.trace("Completion batch due size");
-                        completionBatch(session);
+                        completionBatch(session, "size");
                         reset();
                         continue;
                     }
@@ -451,7 +454,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                                     if (complete) {
                                         // trigger completion predicate
                                         LOG.trace("Completion batch due predicate");
-                                        completionBatch(session);
+                                        completionBatch(session, "predicate");
                                         reset();
                                     }
                                 } catch (Exception e) {
@@ -489,12 +492,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 aggregatedExchange = null;
             }
 
-            private void completionBatch(final Session session) {
+            private void completionBatch(final Session session, String completedBy) {
                 // batch
                 if (aggregatedExchange == null && getEndpoint().isSendEmptyMessageWhenIdle()) {
                     processEmptyMessage();
                 } else if (aggregatedExchange != null) {
-                    processBatch(aggregatedExchange, session);
+                    processBatch(aggregatedExchange, session, completedBy);
                 }
             }
 
@@ -543,7 +546,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         /**
          * Send an message with the batches messages.
          */
-        private void processBatch(Exchange exchange, Session session) {
+        private void processBatch(Exchange exchange, Session session, String completedBy) {
             int id = BATCH_COUNT.getAndIncrement();
             int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
             if (LOG.isDebugEnabled()) {
@@ -551,6 +554,16 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + total);
             }
 
+            if ("timeout".equals(completedBy) && aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
+                ((TimeoutAwareAggregationStrategy)aggregationStrategy).timeout(exchange, id, batchSize, completionTimeout);
+            }
+            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, completedBy);
+
+            // invoke the on completion callback
+            if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
+                ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
+            }
+
             SessionCompletion sessionCompletion = new SessionCompletion(session);
             exchange.addOnCompletion(sessionCompletion);
             try {
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 6afc288..a611be1 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -25,7 +25,9 @@ import javax.jms.ConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.util.toolbox.AggregationStrategies;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.SjmsComponent;
@@ -153,6 +155,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
         template.sendBody("direct:in", generateStrings(messageCount));
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("size", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -180,6 +185,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         template.sendBody("direct:in", generateStrings(50));
         template.sendBody("direct:in", "Message done");
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("predicate", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -206,6 +214,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         template.sendBody("direct:in", generateStrings(messageCount));
         mockBatches.assertIsSatisfied();
         assertFirstMessageBodyOfLength(mockBatches, messageCount);
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("timeout", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -233,6 +244,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         template.sendBody("direct:in", generateStrings(messageCount));
 
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("interval", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -383,8 +397,47 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         context.startRoute("batchConsumer");
 
         assertMockEndpointsSatisfied();
-        stopWatch.stop();
+        stopWatch.taken();
+    }
+
+    @Test
+    public void testConsumptionCompletionAware() throws Exception {
+        final int completionSize = 5;
 
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                ((SimpleRegistry)context.getRegistry()).put("groupedStrategy", AggregationStrategies.groupedBody());
+
+                fromF("sjms-batch:%s?completionSize=%s&aggregationStrategy=#groupedStrategy",
+                        queueName, completionSize).routeId("batchConsumer").startupOrder(10)
+                        .log(LoggingLevel.DEBUG, "${body.size}")
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(1);
+
+        template.sendBody("direct:in", "A,B,C,D,E");
+
+        mockBatches.assertIsSatisfied();
+        
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertNotNull(msg);
+
+        assertEquals("size", msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+
+        List body = msg.getBody(List.class);
+        assertNotNull(body);
+        assertEquals(5, body.size());
+        assertEquals("A", body.get(0));
+        assertEquals("B", body.get(1));
+        assertEquals("C", body.get(2));
+        assertEquals("D", body.get(3));
+        assertEquals("E", body.get(4));
     }
 
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {