You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/21 13:35:12 UTC

camel git commit: CAMEL-9974: Add completionPreidicate to camel-sjms batch component.

Repository: camel
Updated Branches:
  refs/heads/master 76a10b773 -> c3b236dbe


CAMEL-9974: Add completionPreidicate to camel-sjms batch component.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3b236db
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3b236db
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3b236db

Branch: refs/heads/master
Commit: c3b236dbe0376285a2af6d691f3a1f9f5ba116fe
Parents: 76a10b7
Author: Claus Ibsen <da...@apache.org>
Authored: Sat May 21 15:35:02 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat May 21 15:35:02 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/model/AggregateDefinition.java |  2 +-
 .../camel-sjms/src/main/docs/sjms-batch.adoc    |  6 +++-
 .../component/sjms/batch/SjmsBatchConsumer.java | 25 +++++++++++++
 .../component/sjms/batch/SjmsBatchEndpoint.java | 38 ++++++++++++++++++++
 .../sjms/batch/SjmsBatchConsumerTest.java       | 27 ++++++++++++++
 5 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index b0e8bd7..e14118e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -661,7 +661,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
 
     /**
      * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
-     * At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
+     * As opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
      *
      * @return builder
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/components/camel-sjms/src/main/docs/sjms-batch.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/docs/sjms-batch.adoc b/components/camel-sjms/src/main/docs/sjms-batch.adoc
index 3bfaccd..715ae59 100644
--- a/components/camel-sjms/src/main/docs/sjms-batch.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-batch.adoc
@@ -134,8 +134,9 @@ The Simple JMS Batch component supports 1 options which are listed below.
 
 
 
+
 // endpoint options: START
-The Simple JMS Batch component supports 19 endpoint options which are listed below:
+The Simple JMS Batch component supports 21 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -146,9 +147,11 @@ The Simple JMS Batch component supports 19 endpoint options which are listed bel
 | allowNullBody | consumer | true | boolean | Whether to allow sending messages with no body. If this option is false and the message body is null then an JMSException is thrown.
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
 | completionInterval | consumer | 1000 | int | The completion interval in millis which causes batches to be completed in a scheduled fixed rate every interval. The batch may be empty if the timeout triggered and there was no messages in the batch. Notice you cannot use both completion timeout and completion interval at the same time only one can be configured.
+| completionPredicate | consumer |  | String | The completion predicate which causes batches to be completed when the predicate evaluates as true. The predicate can be configured using the simple language using the string syntax.
 | completionSize | consumer | 200 | int | The number of messages consumed at which the batch will be completed
 | completionTimeout | consumer | 500 | int | The timeout in millis from receipt of the first first message when the batch will be completed. The batch may be empty if the timeout triggered and there was no messages in the batch. Notice you cannot use both completion timeout and completion interval at the same time only one can be configured.
 | consumerCount | consumer | 1 | int | The number of JMS sessions to consume from
+| eagerCheckCompletion | consumer | false | boolean | Use eager completion checking which means that the completionPredicate will use the incoming Exchange. At opposed to without eager completion checking the completionPredicate will use the aggregated Exchange.
 | includeAllJMSXProperties | consumer | false | boolean | Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. Setting this to true will include properties such as JMSXAppID and JMSXUserID etc. Note: If you are using a custom headerFilterStrategy then this option does not apply.
 | mapJmsMessage | consumer | true | boolean | Specifies whether Camel should auto map the received JMS message to a suited payload type such as javax.jms.TextMessage to a String etc. See section about how mapping works below for more details.
 | pollDuration | consumer | 1000 | int | The duration in milliseconds of each poll for messages. completionTimeOut will be used if it is shorter and a batch has started.
@@ -166,6 +169,7 @@ The Simple JMS Batch component supports 19 endpoint options which are listed bel
 
 
 
+
 The `completionSize` endpoint attribute is used in conjunction with
 `completionTimeout`, where the first condition to be met will cause the
 aggregated `Exchange` to be emitted down the route.

http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 1fe5617..243c2b8 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -36,6 +36,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -63,6 +64,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
     private final int completionSize;
     private final int completionInterval;
     private final int completionTimeout;
+    private final Predicate completionPredicate;
+    private final boolean eagerCheckCompletion;
     private final int consumerCount;
     private final int pollDuration;
     private final ConnectionFactory connectionFactory;
@@ -90,6 +93,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         if (sjmsBatchEndpoint.isSendEmptyMessageWhenIdle() && completionTimeout <= 0 && completionInterval <= 0) {
             throw new IllegalArgumentException("SendEmptyMessageWhenIdle can only be enabled if either completionInterval or completionTimeout is also set");
         }
+        completionPredicate = sjmsBatchEndpoint.getCompletionPredicate();
+        eagerCheckCompletion = sjmsBatchEndpoint.isEagerCheckCompletion();
 
         pollDuration = sjmsBatchEndpoint.getPollDuration();
         if (pollDuration < 0) {
@@ -328,6 +333,26 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                             final Exchange exchange = getEndpoint().createExchange(message, session);
                             aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
                             aggregatedExchange.setProperty(Exchange.BATCH_SIZE, messageCount);
+
+                            // is the batch complete by predicate?
+                            if (completionPredicate != null) {
+                                try {
+                                    boolean complete;
+                                    if (eagerCheckCompletion) {
+                                        complete = completionPredicate.matches(exchange);
+                                    } else {
+                                        complete = completionPredicate.matches(aggregatedExchange);
+                                    }
+                                    if (complete) {
+                                        // trigger completion predicate
+                                        LOG.trace("Completion batch due predicate");
+                                        completionBatch(session);
+                                        reset();
+                                    }
+                                } catch (Exception e) {
+                                    LOG.warn("Error during evaluation of completion predicate " + e.getMessage() + ". This exception is ignored.", e);
+                                }
+                            }
                         }
 
                         if (usingTimeout && startTime > 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 9286000..4e06b87 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -23,6 +23,7 @@ import javax.jms.Session;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.sjms.SjmsHeaderFilterStrategy;
@@ -33,6 +34,7 @@ import org.apache.camel.component.sjms.jms.JmsBinding;
 import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy;
 import org.apache.camel.component.sjms.jms.MessageCreatedStrategy;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.language.simple.SimpleLanguage;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -65,6 +67,10 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
     @UriParam(defaultValue = "1000")
     private int completionInterval;
+    @UriParam(javaType = "java.lang.String")
+    private Predicate completionPredicate;
+    @UriParam
+    private boolean eagerCheckCompletion;
     @UriParam
     private boolean sendEmptyMessageWhenIdle;
     @UriParam(defaultValue = "1000")
@@ -212,6 +218,38 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
         this.completionInterval = completionInterval;
     }
 
+    public Predicate getCompletionPredicate() {
+        return completionPredicate;
+    }
+
+    /**
+     * The completion predicate, which causes batches to be completed when the predicate evaluates as true.
+     * <p/>
+     * The predicate can also be configured using the simple language using the string syntax.
+     * You may want to set the option eagerCheckCompletion to true to let the predicate match the incoming message,
+     * as otherwise it matches the aggregated message.
+     */
+    public void setCompletionPredicate(Predicate completionPredicate) {
+        this.completionPredicate = completionPredicate;
+    }
+
+    public void setCompletionPredicate(String predicate) {
+        // uses simple language
+        this.completionPredicate = SimpleLanguage.predicate(predicate);
+    }
+
+    public boolean isEagerCheckCompletion() {
+        return eagerCheckCompletion;
+    }
+
+    /**
+     * Use eager completion checking which means that the completionPredicate will use the incoming Exchange.
+     * As opposed to without eager completion checking the completionPredicate will use the aggregated Exchange.
+     */
+    public void setEagerCheckCompletion(boolean eagerCheckCompletion) {
+        this.eagerCheckCompletion = eagerCheckCompletion;
+    }
+
     public boolean isSendEmptyMessageWhenIdle() {
         return sendEmptyMessageWhenIdle;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index e9e5cc8..e378457 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -152,6 +152,33 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
     }
 
     @Test
+    public void testConsumptionCompletionPredicate() throws Exception {
+        final String completionPredicate = "${body} contains 'done'";
+        final int completionTimeout = -1; // predicate-based only
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                fromF("sjms-batch:%s?completionTimeout=%s&completionPredicate=%s&aggregationStrategy=#testStrategy&eagerCheckCompletion=true",
+                        queueName, completionTimeout, completionPredicate).routeId("batchConsumer").startupOrder(10)
+                        .log(LoggingLevel.DEBUG, "${body.size}")
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(2);
+
+        template.sendBody("direct:in", generateStrings(50));
+        template.sendBody("direct:in", "Message done");
+        template.sendBody("direct:in", generateStrings(50));
+        template.sendBody("direct:in", "Message done");
+        mockBatches.assertIsSatisfied();
+    }
+
+    @Test
     public void testConsumptionCompletionTimeout() throws Exception {
         final int completionTimeout = 2000;
         final int completionSize = -1; // timeout-based only