You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/02 14:31:43 UTC

svn commit: r1130521 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/aggregator/ components/camel-hawtdb/src/main/java/org/...

Author: davsclaus
Date: Thu Jun  2 12:31:42 2011
New Revision: 1130521

URL: http://svn.apache.org/viewvc?rev=1130521&view=rev
Log:
CAMEL-4037: Aggregate EIP with only completion timeout condition will restore timeout map upon restart based on exchanges from aggregation repository.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
      - copied, changed from r1130434, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java
      - copied, changed from r1130434, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java
      - copied, changed from r1130501, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1130521&r1=1130520&r2=1130521&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Jun  2 12:31:42 2011
@@ -74,6 +74,7 @@ public interface Exchange {
     String AUTHENTICATION_FAILURE_POLICY_ID = "CamelAuthenticationFailurePolicyId";
     String ACCEPT_CONTENT_TYPE              = "CamelAcceptContentType";
     String AGGREGATED_SIZE                  = "CamelAggregatedSize";
+    String AGGREGATED_TIMEOUT               = "CamelAggregatedTimeout";
     String AGGREGATED_COMPLETED_BY          = "CamelAggregatedCompletedBy";
     String AGGREGATED_CORRELATION_KEY       = "CamelAggregatedCorrelationKey";
     String AGGREGATION_STRATEGY             = "CamelAggregationStrategy";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1130521&r1=1130520&r2=1130521&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Jun  2 12:31:42 2011
@@ -54,6 +54,8 @@ import org.apache.camel.util.ExchangeHel
 import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.TimeUtils;
 import org.apache.camel.util.TimeoutMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,8 +74,6 @@ import org.slf4j.LoggerFactory;
  * messages for the same stock are combined (or just the latest message is used
  * and older prices are discarded). Another idea is to combine line item messages
  * together into a single invoice message.
- *
- * @version 
  */
 public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
 
@@ -198,7 +198,8 @@ public class AggregateProcessor extends 
      * @param key      the correlation key
      * @param exchange the exchange
      * @return the aggregated exchange
-     * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
+     * @throws org.apache.camel.CamelExchangeException
+     *          is thrown if error aggregating
      */
     private Exchange doAggregation(String key, Exchange exchange) throws CamelExchangeException {
         LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
@@ -275,8 +276,8 @@ public class AggregateProcessor extends 
     /**
      * Tests whether the given exchange is complete or not
      *
-     * @param key       the correlation key
-     * @param exchange  the incoming exchange
+     * @param key      the correlation key
+     * @param exchange the incoming exchange
      * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion
      */
     protected String isCompleted(String key, Exchange exchange) {
@@ -311,7 +312,7 @@ public class AggregateProcessor extends 
             if (value != null && value > 0) {
                 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
                         new Object[]{key, value, exchange});
-                timeoutMap.put(key, exchange.getExchangeId(), value);
+                addExchangeToTimeoutMap(key, exchange, value);
                 timeoutSet = true;
             }
         }
@@ -319,10 +320,10 @@ public class AggregateProcessor extends 
             // timeout is used so use the timeout map to keep an eye on this
             LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
                     new Object[]{key, getCompletionTimeout(), exchange});
-            timeoutMap.put(key, exchange.getExchangeId(), getCompletionTimeout());
+            addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
         }
 
-        if (isCompletionFromBatchConsumer()) {            
+        if (isCompletionFromBatchConsumer()) {
             batchConsumerCorrelationKeys.add(key);
             batchConsumerCounter.incrementAndGet();
             int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
@@ -400,6 +401,50 @@ public class AggregateProcessor extends 
         });
     }
 
+    /**
+     * Restores the timeout map with timeout values from the aggregation repository.
+     * <p/>
+     * This is needed in case the aggregator has been stopped and started again (for example a server restart).
+     * Then the existing exchanges from the {@link AggregationRepository} must have its timeout conditions restored.
+     */
+    protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
+        StopWatch watch = new StopWatch();
+        LOG.trace("Starting restoring CompletionTimeout for existing exchanges from the aggregation repository...");
+
+        // grab the timeout value for each partly aggregated exchange
+        Set<String> keys = aggregationRepository.getKeys();
+        if (keys == null || keys.isEmpty()) {
+            return;
+        }
+
+        for (String key : keys) {
+            Exchange exchange = aggregationRepository.get(camelContext, key);
+            // grab the timeout value
+            long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0;
+            if (timeout > 0) {
+                LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout);
+                addExchangeToTimeoutMap(key, exchange, timeout);
+            }
+        }
+
+        // log duration of this task so end user can see how long it takes to pre-check this upon starting
+        LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}",
+                timeoutMap.size(), TimeUtils.printDuration(watch.stop()));
+    }
+
+    /**
+     * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts.
+     *
+     * @param key      the correlation key
+     * @param exchange the exchange
+     * @param timeout  the timeout value in millis
+     */
+    private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) {
+        // store the timeout value on the exchange as well, in case we need it later
+        exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout);
+        timeoutMap.put(key, exchange.getExchangeId(), timeout);
+    }
+
     public Predicate getCompletionPredicate() {
         return completionPredicate;
     }
@@ -797,6 +842,9 @@ public class AggregateProcessor extends 
             ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
             // check for timed out aggregated messages once every second
             timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
+            // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we
+            // need to re-establish the timeout map so timeout can trigger
+            restoreTimeoutMapFromAggregationRepository();
             ServiceHelper.startService(timeoutMap);
         }
     }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java (from r1130434, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java&r1=1130434&r2=1130521&rev=1130521&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java Thu Jun  2 12:31:42 2011
@@ -18,13 +18,10 @@ package org.apache.camel.processor.aggre
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.camel.CamelExchangeException;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
-import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultExchange;
@@ -32,12 +29,13 @@ import org.apache.camel.processor.BodyIn
 import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.spi.ExceptionHandler;
 
 /**
+ * To test CAMEL-4037 that a restart of aggregator can re-initialize the timeout map
+ *
  * @version 
  */
-public class AggregateProcessorTest extends ContextTestSupport {
+public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestSupport {
 
     private ExecutorService executorService;
 
@@ -52,60 +50,18 @@ public class AggregateProcessorTest exte
         executorService = Executors.newSingleThreadExecutor();
     }
 
-    public void testAggregateProcessorCompletionPredicate() throws Exception {
+    public void testAggregateProcessorTimeoutRestart() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+B+END");
-        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate");
-
-        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
-        Expression corr = header("id");
-        AggregationStrategy as = new BodyInAggregatingStrategy();
-        Predicate complete = body().contains("END");
-
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionPredicate(complete);
-        ap.setEagerCheckCompletion(false);
-        ap.start();
-
-        Exchange e1 = new DefaultExchange(context);
-        e1.getIn().setBody("A");
-        e1.getIn().setHeader("id", 123);
-
-        Exchange e2 = new DefaultExchange(context);
-        e2.getIn().setBody("B");
-        e2.getIn().setHeader("id", 123);
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("END");
-        e3.getIn().setHeader("id", 123);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("D");
-        e4.getIn().setHeader("id", 123);
-
-        ap.process(e1);
-        ap.process(e2);
-        ap.process(e3);
-        ap.process(e4);
-
-        assertMockEndpointsSatisfied();
-
-        ap.stop();
-    }
-
-    public void testAggregateProcessorCompletionPredicateEager() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+B+END");
-        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate");
+        mock.expectedBodiesReceived("A+B");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
-        Predicate complete = body().isEqualTo("END");
 
         AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionPredicate(complete);
-        ap.setEagerCheckCompletion(true);
+        // start with a high timeout so no completes before we stop
+        ap.setCompletionTimeout(2000);
         ap.start();
 
         Exchange e1 = new DefaultExchange(context);
@@ -116,83 +72,29 @@ public class AggregateProcessorTest exte
         e2.getIn().setBody("B");
         e2.getIn().setHeader("id", 123);
 
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("END");
-        e3.getIn().setHeader("id", 123);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("D");
-        e4.getIn().setHeader("id", 123);
-
         ap.process(e1);
         ap.process(e2);
-        ap.process(e3);
-        ap.process(e4);
-
-        assertMockEndpointsSatisfied();
 
+        // shutdown before the 2 sec timeout occurs
+        // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository,
         ap.stop();
-    }
-
-    public void testAggregateProcessorCompletionAggregatedSize() throws Exception {
-        doTestAggregateProcessorCompletionAggregatedSize(false);
-    }
-
-    public void testAggregateProcessorCompletionAggregatedSizeEager() throws Exception {
-        doTestAggregateProcessorCompletionAggregatedSize(true);
-    }
-
-    private void doTestAggregateProcessorCompletionAggregatedSize(boolean eager) throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+B+C");
-        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "size");
 
-        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
-        Expression corr = header("id");
-        AggregationStrategy as = new BodyInAggregatingStrategy();
+        // should be no completed
+        assertEquals(0, mock.getReceivedCounter());
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionSize(3);
-        ap.setEagerCheckCompletion(eager);
+        // start aggregator again
         ap.start();
 
-        Exchange e1 = new DefaultExchange(context);
-        e1.getIn().setBody("A");
-        e1.getIn().setHeader("id", 123);
-
-        Exchange e2 = new DefaultExchange(context);
-        e2.getIn().setBody("B");
-        e2.getIn().setHeader("id", 123);
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("C");
-        e3.getIn().setHeader("id", 123);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("D");
-        e4.getIn().setHeader("id", 123);
-
-        ap.process(e1);
-        ap.process(e2);
-        ap.process(e3);
-        ap.process(e4);
-
+        // the aggregator should restore the timeout condition and trigger timeout
         assertMockEndpointsSatisfied();
+        assertEquals(1, mock.getReceivedCounter());
 
-        ap.stop();
-    }
-
-    public void testAggregateProcessorCompletionTimeout() throws Exception {
-        doTestAggregateProcessorCompletionTimeout(false);
-    }
-
-    public void testAggregateProcessorCompletionTimeoutEager() throws Exception {
-        doTestAggregateProcessorCompletionTimeout(true);
+        ap.shutdown();
     }
 
-    private void doTestAggregateProcessorCompletionTimeout(boolean eager) throws Exception {
+    public void testAggregateProcessorTimeoutExpressionRestart() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+B+C");
+        mock.expectedBodiesReceived("A+B");
         mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
@@ -200,363 +102,93 @@ public class AggregateProcessorTest exte
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
         AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionTimeout(3000);
-        ap.setEagerCheckCompletion(eager);
-        ap.start();
-
-        Exchange e1 = new DefaultExchange(context);
-        e1.getIn().setBody("A");
-        e1.getIn().setHeader("id", 123);
-
-        Exchange e2 = new DefaultExchange(context);
-        e2.getIn().setBody("B");
-        e2.getIn().setHeader("id", 123);
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("C");
-        e3.getIn().setHeader("id", 123);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("D");
-        e4.getIn().setHeader("id", 123);
-
-        ap.process(e1);
-
-        Thread.sleep(250);
-        ap.process(e2);
-
-        Thread.sleep(500);
-        ap.process(e3);
-
-        Thread.sleep(5000);
-        ap.process(e4);
-
-        assertMockEndpointsSatisfied();
-
-        ap.stop();
-    }
-
-    public void testAggregateCompletionInterval() throws Exception {
-        // camel context must be started
-        context.start();
-
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+B+C", "D");
-        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
-
-        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
-        Expression corr = header("id");
-        AggregationStrategy as = new BodyInAggregatingStrategy();
-
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionInterval(3000);
+        // start with a high timeout so no completes before we stop
+        ap.setCompletionTimeoutExpression(header("myTimeout"));
         ap.start();
 
         Exchange e1 = new DefaultExchange(context);
         e1.getIn().setBody("A");
         e1.getIn().setHeader("id", 123);
+        e1.getIn().setHeader("myTimeout", 2000);
 
         Exchange e2 = new DefaultExchange(context);
         e2.getIn().setBody("B");
         e2.getIn().setHeader("id", 123);
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("C");
-        e3.getIn().setHeader("id", 123);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("D");
-        e4.getIn().setHeader("id", 123);
-
-        ap.process(e1);
-        ap.process(e2);
-        ap.process(e3);
-
-        Thread.sleep(5000);
-        ap.process(e4);
-
-        assertMockEndpointsSatisfied();
-
-        ap.stop();
-    }
-
-    public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+C+END");
-
-        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
-        Expression corr = header("id");
-        AggregationStrategy as = new BodyInAggregatingStrategy();
-        Predicate complete = body().contains("END");
-
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionPredicate(complete);
-        ap.setIgnoreInvalidCorrelationKeys(true);
-
-        ap.start();
-
-        Exchange e1 = new DefaultExchange(context);
-        e1.getIn().setBody("A");
-        e1.getIn().setHeader("id", 123);
-
-        Exchange e2 = new DefaultExchange(context);
-        e2.getIn().setBody("B");
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("C");
-        e3.getIn().setHeader("id", 123);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("END");
-        e4.getIn().setHeader("id", 123);
+        e2.getIn().setHeader("myTimeout", 2000);
 
         ap.process(e1);
         ap.process(e2);
-        ap.process(e3);
-        ap.process(e4);
-
-        assertMockEndpointsSatisfied();
-
-        ap.stop();
-    }
-
-    public void testAggregateBadCorrelationKey() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+C+END");
-
-        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
-        Expression corr = header("id");
-        AggregationStrategy as = new BodyInAggregatingStrategy();
-        Predicate complete = body().contains("END");
-
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionPredicate(complete);
-
-        ap.start();
-
-        Exchange e1 = new DefaultExchange(context);
-        e1.getIn().setBody("A");
-        e1.getIn().setHeader("id", 123);
-
-        Exchange e2 = new DefaultExchange(context);
-        e2.getIn().setBody("B");
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("C");
-        e3.getIn().setHeader("id", 123);
-
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("END");
-        e4.getIn().setHeader("id", 123);
-
-        ap.process(e1);
-
-        try {
-            ap.process(e2);
-            fail("Should have thrown an exception");
-        } catch (CamelExchangeException e) {
-            assertEquals("Invalid correlation key. Exchange[Message: B]", e.getMessage());
-        }
-
-        ap.process(e3);
-        ap.process(e4);
-
-        assertMockEndpointsSatisfied();
 
+        // shutdown before the 2 sec timeout occurs
+        // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository,
         ap.stop();
-    }
 
-    public void testAggregateCloseCorrelationKeyOnCompletion() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+B+END");
-
-        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
-        Expression corr = header("id");
-        AggregationStrategy as = new BodyInAggregatingStrategy();
-        Predicate complete = body().contains("END");
-
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionPredicate(complete);
-        ap.setCloseCorrelationKeyOnCompletion(1000);
+        // should be no completed
+        assertEquals(0, mock.getReceivedCounter());
 
+        // start aggregator again
         ap.start();
 
-        Exchange e1 = new DefaultExchange(context);
-        e1.getIn().setBody("A");
-        e1.getIn().setHeader("id", 123);
-
-        Exchange e2 = new DefaultExchange(context);
-        e2.getIn().setBody("B");
-        e2.getIn().setHeader("id", 123);
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("END");
-        e3.getIn().setHeader("id", 123);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("C");
-        e4.getIn().setHeader("id", 123);
-
-        ap.process(e1);
-        ap.process(e2);
-        ap.process(e3);
-
-        try {
-            ap.process(e4);
-            fail("Should have thrown an exception");
-        } catch (CamelExchangeException e) {
-            assertEquals("The correlation key [123] has been closed. Exchange[Message: C]", e.getMessage());
-        }
-
+        // the aggregator should restore the timeout condition and trigger timeout
         assertMockEndpointsSatisfied();
+        assertEquals(1, mock.getReceivedCounter());
 
-        ap.stop();
+        ap.shutdown();
     }
 
-    public void testAggregateUseBatchSizeFromConsumer() throws Exception {
+    public void testAggregateProcessorTwoTimeoutExpressionRestart() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+B", "C+D+E");
-        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "consumer");
+        mock.expectedBodiesReceived("C+D", "A+B");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
         AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionSize(100);
-        ap.setCompletionFromBatchConsumer(true);
-
+        // start with a high timeout so no completes before we stop
+        ap.setCompletionTimeoutExpression(header("myTimeout"));
         ap.start();
 
         Exchange e1 = new DefaultExchange(context);
         e1.getIn().setBody("A");
         e1.getIn().setHeader("id", 123);
-        e1.setProperty(Exchange.BATCH_INDEX, 0);
-        e1.setProperty(Exchange.BATCH_SIZE, 2);
-        e1.setProperty(Exchange.BATCH_COMPLETE, false);
+        e1.getIn().setHeader("myTimeout", 3000);
 
         Exchange e2 = new DefaultExchange(context);
         e2.getIn().setBody("B");
         e2.getIn().setHeader("id", 123);
-        e2.setProperty(Exchange.BATCH_INDEX, 1);
-        e2.setProperty(Exchange.BATCH_SIZE, 2);
-        e2.setProperty(Exchange.BATCH_COMPLETE, true);
+        e2.getIn().setHeader("myTimeout", 3000);
 
         Exchange e3 = new DefaultExchange(context);
         e3.getIn().setBody("C");
-        e3.getIn().setHeader("id", 123);
-        e3.setProperty(Exchange.BATCH_INDEX, 0);
-        e3.setProperty(Exchange.BATCH_SIZE, 3);
-        e3.setProperty(Exchange.BATCH_COMPLETE, false);
+        e3.getIn().setHeader("id", 456);
+        e3.getIn().setHeader("myTimeout", 2000);
 
         Exchange e4 = new DefaultExchange(context);
         e4.getIn().setBody("D");
-        e4.getIn().setHeader("id", 123);
-        e4.setProperty(Exchange.BATCH_INDEX, 1);
-        e4.setProperty(Exchange.BATCH_SIZE, 3);
-        e4.setProperty(Exchange.BATCH_COMPLETE, false);
-
-        Exchange e5 = new DefaultExchange(context);
-        e5.getIn().setBody("E");
-        e5.getIn().setHeader("id", 123);
-        e5.setProperty(Exchange.BATCH_INDEX, 2);
-        e5.setProperty(Exchange.BATCH_SIZE, 3);
-        e5.setProperty(Exchange.BATCH_COMPLETE, true);
+        e4.getIn().setHeader("id", 456);
+        e4.getIn().setHeader("myTimeout", 2000);
 
         ap.process(e1);
         ap.process(e2);
         ap.process(e3);
         ap.process(e4);
-        ap.process(e5);
-
-        assertMockEndpointsSatisfied();
 
+        // shutdown before the 2 sec timeout occurs
+        // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository,
         ap.stop();
-    }
 
-    public void testAggregateLogFailedExchange() throws Exception {
-        doTestAggregateLogFailedExchange(null);
-    }
-
-    public void testAggregateHandleFailedExchange() throws Exception {
-        final AtomicBoolean tested = new AtomicBoolean();
-
-        ExceptionHandler myHandler = new ExceptionHandler() {
-            public void handleException(Throwable exception) {
-            }
-
-            public void handleException(String message, Throwable exception) {
-            }
-
-            public void handleException(String message, Exchange exchange, Throwable exception) {
-                assertEquals("Error processing aggregated exchange", message);
-                assertEquals("B+Kaboom+END", exchange.getIn().getBody());
-                assertEquals("Damn", exception.getMessage());
-                tested.set(true);
-            }
-        };
-
-        doTestAggregateLogFailedExchange(myHandler);
-        assertEquals(true, tested.get());
-    }
-
-    private void doTestAggregateLogFailedExchange(ExceptionHandler handler) throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+END");
-
-        Processor done = new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                if (exchange.getIn().getBody(String.class).contains("Kaboom")) {
-                    throw new IllegalArgumentException("Damn");
-                }
-                // else send it further along
-                SendProcessor send = new SendProcessor(context.getEndpoint("mock:result"));
-                send.start();
-                send.process(exchange);
-            }
-        };
-                
-        Expression corr = header("id");
-        AggregationStrategy as = new BodyInAggregatingStrategy();
+        // should be no completed
+        assertEquals(0, mock.getReceivedCounter());
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setEagerCheckCompletion(true);
-        ap.setCompletionPredicate(body().isEqualTo("END"));
-        if (handler != null) {
-            ap.setExceptionHandler(handler);
-        }
+        // start aggregator again
         ap.start();
 
-        Exchange e1 = new DefaultExchange(context);
-        e1.getIn().setBody("A");
-        e1.getIn().setHeader("id", 123);
-
-        Exchange e2 = new DefaultExchange(context);
-        e2.getIn().setBody("B");
-        e2.getIn().setHeader("id", 456);
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("Kaboom");
-        e3.getIn().setHeader("id", 456);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("END");
-        e4.getIn().setHeader("id", 456);
-
-        Exchange e5 = new DefaultExchange(context);
-        e5.getIn().setBody("END");
-        e5.getIn().setHeader("id", 123);
-
-        ap.process(e1);
-        ap.process(e2);
-        ap.process(e3);
-        ap.process(e4);
-        ap.process(e5);
-
+        // the aggregator should restore the timeout condition and trigger timeout
         assertMockEndpointsSatisfied();
+        assertEquals(2, mock.getReceivedCounter());
 
-        ap.stop();
+        ap.shutdown();
     }
-
 }

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java?rev=1130521&r1=1130520&r2=1130521&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java Thu Jun  2 12:31:42 2011
@@ -17,7 +17,6 @@
 package org.apache.camel.component.hawtdb;
 
 import java.io.IOException;
-import java.io.Serializable;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -55,8 +54,9 @@ public final class HawtDBCamelCodec {
         DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
         // use DefaultExchangeHolder to marshal to a serialized object
         DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
-        // add the aggregated size property as the only property we want to retain
+        // add the aggregated size and timeout property as the only properties we want to retain
         DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_TIMEOUT, exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, Long.class));
         // add the aggregated completed by property to retain
         DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
         // add the aggregated correlation key property to retain

Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java (from r1130434, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java&r1=1130434&r2=1130521&rev=1130521&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java Thu Jun  2 12:31:42 2011
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.hawtdb;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -25,7 +23,7 @@ import org.apache.camel.processor.aggreg
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
-public class HawtDBAggregateTest extends CamelTestSupport {
+public class HawtDBAggregateTimeoutCompletionRestartTest extends CamelTestSupport {
 
     @Override
     public void setUp() throws Exception {
@@ -34,27 +32,32 @@ public class HawtDBAggregateTest extends
     }
 
     @Test
-    public void testHawtDBAggregate() throws Exception {
+    public void testHawtDBAggregateTimeoutCompletionRestart() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
-        mock.expectedBodiesReceived("ABCDE");
+        mock.expectedMessageCount(0);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
-        template.sendBodyAndHeader("direct:start", "D", "id", 123);
-        template.sendBodyAndHeader("direct:start", "E", "id", 123);
 
-        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+        // stop Camel
+        context.stop();
+        assertEquals(0, mock.getReceivedCounter());
+
+        // start Camel again, and the timeout should trigger a completion
+        context.start();
+
+        mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABC");
 
-        // from endpoint should be preserved
-        assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+        assertMockEndpointsSatisfied();
+        assertEquals(1, mock.getReceivedCounter());
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
-            // START SNIPPET: e1
             public void configure() throws Exception {
                 // create the hawtdb repo
                 HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");
@@ -63,10 +66,9 @@ public class HawtDBAggregateTest extends
                 from("direct:start")
                     .aggregate(header("id"), new MyAggregationStrategy())
                         // use our created hawtdb repo as aggregation repository
-                        .completionSize(5).aggregationRepository(repo)
+                        .completionTimeout(3000).aggregationRepository(repo)
                         .to("mock:aggregated");
             }
-            // END SNIPPET: e1
         };
     }
 

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java?rev=1130521&r1=1130520&r2=1130521&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java Thu Jun  2 12:31:42 2011
@@ -36,8 +36,9 @@ public final class JdbcCamelCodec {
     public byte[] marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
         // use DefaultExchangeHolder to marshal to a serialized object
         DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
-        // add the aggregated size property as the only property we want to retain
+        // add the aggregated size and timeout property as the only properties we want to retain
         DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_TIMEOUT, exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, Long.class));
         // add the aggregated completed by property to retain
         DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
         // add the aggregated correlation key property to retain

Copied: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java (from r1130501, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java&r1=1130501&r2=1130521&rev=1130521&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java Thu Jun  2 12:31:42 2011
@@ -16,29 +16,33 @@
  */
 package org.apache.camel.processor.aggregate.jdbc;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
-public class JdbcAggregateTest extends AbstractJdbcAggregationTestSupport {
+public class JdbcAggregateTimeoutCompletionRestartTest extends AbstractJdbcAggregationTestSupport {
 
     @Test
-    public void testJdbcAggregate() throws Exception {
+    public void testJdbcAggregateTimeoutCompletionRestart() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
-        mock.expectedBodiesReceived("ABCDE");
+        mock.expectedMessageCount(0);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
-        template.sendBodyAndHeader("direct:start", "D", "id", 123);
-        template.sendBodyAndHeader("direct:start", "E", "id", 123);
 
-        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+        // stop Camel
+        context.stop();
+        assertEquals(0, mock.getReceivedCounter());
+
+        // start Camel again, and the timeout should trigger a completion
+        context.start();
+
+        mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABC");
 
-        // from endpoint should be preserved
-        assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+        assertMockEndpointsSatisfied();
+        assertEquals(1, mock.getReceivedCounter());
     }
 
     @Override
@@ -50,7 +54,7 @@ public class JdbcAggregateTest extends A
                 from("direct:start")
                         .aggregate(header("id"), new MyAggregationStrategy())
                                 // use our created jdbc repo as aggregation repository
-                        .completionSize(5).aggregationRepository(repo)
+                        .completionTimeout(3000).aggregationRepository(repo)
                         .to("mock:aggregated");
             }
         };