You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/07 09:58:19 UTC

svn commit: r1310691 - in /camel/branches/camel-2.9.x: ./ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/aggregator/

Author: davsclaus
Date: Sat Apr  7 07:58:18 2012
New Revision: 1310691

URL: http://svn.apache.org/viewvc?rev=1310691&view=rev
Log:
CAMEL-5148: Aggregate EIP now supports TimeoutAwareAggregationStrategy.

Added:
    camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
      - copied unchanged from r1310690, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
    svn:mergeinfo = /camel/trunk:1310690

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1310691&r1=1310690&r2=1310691&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sat Apr  7 07:58:18 2012
@@ -371,6 +371,15 @@ public class AggregateProcessor extends 
             closedCorrelationKeys.put(key, key);
         }
 
+        if (fromTimeout) {
+            // invoke timeout if its timeout aware aggregation strategy,
+            // to allow any custom processing before discarding the exchange
+            if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
+                long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
+                ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(exchange, -1, -1, timeout);
+            }
+        }
+
         if (fromTimeout && isDiscardOnCompletionTimeout()) {
             // discard due timeout
             LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: ()", key, exchange);

Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java?rev=1310691&r1=1310690&r2=1310691&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java Sat Apr  7 07:58:18 2012
@@ -32,9 +32,9 @@ public interface TimeoutAwareAggregation
      *
      * @param oldExchange  the current aggregated exchange, or the original {@link Exchange} if no aggregation
      *                     has been done before the timeout occurred
-     * @param index        the index
-     * @param total        the total
-     * @param timeout      the timeout value in millis
+     * @param index        the index, may be <tt>-1</tt> if not possible to determine the index
+     * @param total        the total, may be <tt>-1</tt> if not possible to determine the total
+     * @param timeout      the timeout value in millis, may be <tt>-1</tt> if not possible to determine the timeout
      */
     void timeout(Exchange oldExchange, int index, int total, long timeout);
 }