You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/07/05 19:26:18 UTC

camel git commit: CAMEL-11515: Add option to configure how frequent to run the timeout checker in the aggregate EIP. Also add better docs about completion timeout.

Repository: camel
Updated Branches:
  refs/heads/master abea861b4 -> d4e01d7a6


CAMEL-11515: Add option to configure how frequent to run the timeout checker in the aggregate EIP. Also add better docs about completion timeout.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d4e01d7a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d4e01d7a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d4e01d7a

Branch: refs/heads/master
Commit: d4e01d7a6c1af9ddd21e8774a75932529bc6c9ab
Parents: abea861
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 5 21:23:14 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 5 21:23:14 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/eips/aggregate-eip.adoc       |  7 ++--
 .../mbean/ManagedAggregateProcessorMBean.java   |  3 ++
 .../mbean/ManagedAggregateProcessor.java        |  4 +++
 .../apache/camel/model/AggregateDefinition.java | 38 ++++++++++++++++++++
 .../processor/aggregate/AggregateProcessor.java | 12 +++++--
 5 files changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/docs/eips/aggregate-eip.adoc
----------------------------------------------------------------------
diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index 19c5d9d..5cdbb64 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -19,7 +19,7 @@ single correlation key into a single message exchange.
 ### Aggregator options
 
 // eip options: START
-The Aggregate EIP supports 25 options which are listed below:
+The Aggregate EIP supports 26 options which are listed below:
 
 
 [width="100%",cols="3,1m,6",options="header"]
@@ -27,7 +27,7 @@ The Aggregate EIP supports 25 options which are listed below:
 | Name | Java Type | Description
 | correlationExpression | NamespaceAwareExpression | *Required* The expression used to calculate the correlation key to use for aggregation. The Exchange which has the same correlation key is aggregated together. If the correlation key could not be evaluated an Exception is thrown. You can disable this by using the ignoreBadCorrelationKeys option.
 | completionPredicate | NamespaceAwareExpression | A Predicate to indicate when an aggregated exchange is complete. If this is not specified and the AggregationStrategy object implements Predicate the aggregationStrategy object will be used as the completionPredicate.
-| completionTimeout | NamespaceAwareExpression | Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval only one of the two can be used.
+| completionTimeout | NamespaceAwareExpression | Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval only one of the two can be used. By default the timeout checker runs every second you can use the completionTimeoutCheckerInterval option to configure how frequently to run the checker. The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. It is not recommended to use very low timeout values or checker intervals.
 | completionSize | NamespaceAwareExpression | Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
 | optimisticLockRetryPolicy | OptimisticLockRetryPolicyDefinition | Allows to configure retry settings when using optimistic locking.
 | parallelProcessing | Boolean | When aggregated are completed they are being send out of the aggregator. This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.
@@ -40,7 +40,8 @@ The Aggregate EIP supports 25 options which are listed below:
 | strategyMethodAllowNull | Boolean | If this option is false then the aggregate method is not used for the very first aggregation. If this option is true then null values is used as the oldExchange (at the very first aggregation) when using POJOs as the AggregationStrategy.
 | completionSize | Integer | Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
 | completionInterval | Long | A repeating period in millis by which the aggregator will complete all current aggregated exchanges. Camel has a background task which is triggered every period. You cannot use this option together with completionTimeout only one of them can be used.
-| completionTimeout | Long | Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval only one of the two can be used.
+| completionTimeout | Long | Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval only one of the two can be used. By default the timeout checker runs every second you can use the completionTimeoutCheckerInterval option to configure how frequently to run the checker. The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. It is not recommended to use very low timeout values or checker intervals.
+| completionTimeoutCheckerInterval | Long | Interval in millis that is used by the background task that checks for timeouts (org.apache.camel.TimeoutMap). By default the timeout checker runs every second. The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. It is not recommended to use very low timeout values or checker intervals.
 | completionFromBatchConsumer | Boolean | Enables the batch completion mode where we aggregate from a org.apache.camel.BatchConsumer and aggregate the total number of exchanges the org.apache.camel.BatchConsumer has reported as total by checking the exchange property link org.apache.camel.ExchangeBATCH_COMPLETE when its complete.
 | groupExchanges | Boolean | Enables grouped exchanges so the aggregator will group all aggregated exchanges into a single combined Exchange holding all the aggregated exchanges in a java.util.List.
 | eagerCheckCompletion | Boolean | Use eager completion checking which means that the completionPredicate will use the incoming Exchange. As opposed to without eager completion checking the completionPredicate will use the aggregated Exchange.

http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index 4944422..fa978c5 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -39,6 +39,9 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "Completion interval in millis")
     long getCompletionInterval();
 
+    @ManagedAttribute(description = "Completion timeout checker interval in millis")
+    long getCompletionTimeoutCheckerInterval();
+
     @ManagedAttribute(description = "Completion size")
     int getCompletionSize();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index e4c167e..1f68652 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -90,6 +90,10 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
         return processor.getCompletionInterval();
     }
 
+    public long getCompletionTimeoutCheckerInterval() {
+        return processor.getCompletionTimeoutCheckerInterval();
+    }
+
     public int getCompletionSize() {
         return processor.getCompletionSize();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index d5044a0..86b4032 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -103,6 +103,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     private Long completionInterval;
     @XmlAttribute
     private Long completionTimeout;
+    @XmlAttribute @Metadata(defaultValue = "1000")
+    private Long completionTimeoutCheckerInterval = 1000L;
     @XmlAttribute
     private Boolean completionFromBatchConsumer;
     @XmlAttribute
@@ -283,6 +285,9 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         if (getAggregateController() != null) {
             answer.setAggregateController(getAggregateController());
         }
+        if (getCompletionTimeoutCheckerInterval() != null) {
+            answer.setCompletionTimeoutCheckerInterval(getCompletionTimeoutCheckerInterval());
+        }
         return answer;
     }
 
@@ -488,6 +493,14 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         this.completionTimeout = completionTimeout;
     }
 
+    public Long getCompletionTimeoutCheckerInterval() {
+        return completionTimeoutCheckerInterval;
+    }
+
+    public void setCompletionTimeoutCheckerInterval(Long completionTimeoutCheckerInterval) {
+        this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval;
+    }
+
     public ExpressionSubElementDefinition getCompletionPredicate() {
         return completionPredicate;
     }
@@ -768,6 +781,11 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
      * a timeout dynamically - will use Long as result.
      * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
      * You cannot use this option together with completionInterval, only one of the two can be used.
+     * <p/>
+     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option
+     * to configure how frequently to run the checker.
+     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
+     * It is not recommended to use very low timeout values or checker intervals.
      *
      * @param completionTimeout  the timeout in millis, must be a positive value
      * @return the builder
@@ -783,6 +801,11 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
      * a timeout dynamically - will use Long as result.
      * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
      * You cannot use this option together with completionInterval, only one of the two can be used.
+     * <p/>
+     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option
+     * to configure how frequently to run the checker.
+     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
+     * It is not recommended to use very low timeout values or checker intervals.
      *
      * @param completionTimeout  the timeout as an {@link Expression} which is evaluated as a {@link Long} type
      * @return the builder
@@ -793,6 +816,21 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * Interval in millis that is used by the background task that checks for timeouts ({@link org.apache.camel.TimeoutMap}).
+     * <p/>
+     * By default the timeout checker runs every second.
+     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
+     * It is not recommended to use very low timeout values or checker intervals.
+     *
+     * @param completionTimeoutCheckerInterval  the interval in millis, must be a positive value
+     * @return the builder
+     */
+    public AggregateDefinition completionTimeoutCheckerInterval(long completionTimeoutCheckerInterval) {
+        setCompletionTimeoutCheckerInterval(completionTimeoutCheckerInterval);
+        return this;
+    }
+
+    /**
      * Sets the AggregationStrategy to use with a fluent builder.
      */
     public AggregationStrategyClause<AggregateDefinition> aggregationStrategy() {

http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 53d048d..50978a0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -61,7 +61,6 @@ import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -210,6 +209,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     private boolean discardOnCompletionTimeout;
     private boolean forceCompletionOnStop;
     private boolean completeAllOnStop;
+    private long completionTimeoutCheckerInterval = 1000;
 
     private ProducerTemplate deadLetterProducerTemplate;
 
@@ -933,6 +933,14 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         return completeAllOnStop;
     }
 
+    public long getCompletionTimeoutCheckerInterval() {
+        return completionTimeoutCheckerInterval;
+    }
+
+    public void setCompletionTimeoutCheckerInterval(long completionTimeoutCheckerInterval) {
+        this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval;
+    }
+
     public ExceptionHandler getExceptionHandler() {
         return exceptionHandler;
     }
@@ -1383,7 +1391,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                 shutdownTimeoutCheckerExecutorService = true;
             }
             // check for timed out aggregated messages once every second
-            timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
+            timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), getCompletionTimeoutCheckerInterval());
             // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we
             // need to re-establish the timeout map so timeout can trigger
             restoreTimeoutMapFromAggregationRepository();