You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/02 14:31:43 UTC
svn commit: r1130521 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/test/java/org/apache/camel/processor/aggregator/
components/camel-hawtdb/src/main/java/org/...
Author: davsclaus
Date: Thu Jun 2 12:31:42 2011
New Revision: 1130521
URL: http://svn.apache.org/viewvc?rev=1130521&view=rev
Log:
CAMEL-4037: Aggregate EIP with only completion timeout condition will restore timeout map upon restart based on exchanges from aggregation repository.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
- copied, changed from r1130434, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java
- copied, changed from r1130434, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java
- copied, changed from r1130501, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1130521&r1=1130520&r2=1130521&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Jun 2 12:31:42 2011
@@ -74,6 +74,7 @@ public interface Exchange {
String AUTHENTICATION_FAILURE_POLICY_ID = "CamelAuthenticationFailurePolicyId";
String ACCEPT_CONTENT_TYPE = "CamelAcceptContentType";
String AGGREGATED_SIZE = "CamelAggregatedSize";
+ String AGGREGATED_TIMEOUT = "CamelAggregatedTimeout";
String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy";
String AGGREGATED_CORRELATION_KEY = "CamelAggregatedCorrelationKey";
String AGGREGATION_STRATEGY = "CamelAggregationStrategy";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1130521&r1=1130520&r2=1130521&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Jun 2 12:31:42 2011
@@ -54,6 +54,8 @@ import org.apache.camel.util.ExchangeHel
import org.apache.camel.util.LRUCache;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.TimeUtils;
import org.apache.camel.util.TimeoutMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,8 +74,6 @@ import org.slf4j.LoggerFactory;
* messages for the same stock are combined (or just the latest message is used
* and older prices are discarded). Another idea is to combine line item messages
* together into a single invoice message.
- *
- * @version
*/
public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
@@ -198,7 +198,8 @@ public class AggregateProcessor extends
* @param key the correlation key
* @param exchange the exchange
* @return the aggregated exchange
- * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
+ * @throws org.apache.camel.CamelExchangeException
+ * is thrown if error aggregating
*/
private Exchange doAggregation(String key, Exchange exchange) throws CamelExchangeException {
LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
@@ -275,8 +276,8 @@ public class AggregateProcessor extends
/**
* Tests whether the given exchange is complete or not
*
- * @param key the correlation key
- * @param exchange the incoming exchange
+ * @param key the correlation key
+ * @param exchange the incoming exchange
* @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion
*/
protected String isCompleted(String key, Exchange exchange) {
@@ -311,7 +312,7 @@ public class AggregateProcessor extends
if (value != null && value > 0) {
LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
new Object[]{key, value, exchange});
- timeoutMap.put(key, exchange.getExchangeId(), value);
+ addExchangeToTimeoutMap(key, exchange, value);
timeoutSet = true;
}
}
@@ -319,10 +320,10 @@ public class AggregateProcessor extends
// timeout is used so use the timeout map to keep an eye on this
LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
new Object[]{key, getCompletionTimeout(), exchange});
- timeoutMap.put(key, exchange.getExchangeId(), getCompletionTimeout());
+ addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
}
- if (isCompletionFromBatchConsumer()) {
+ if (isCompletionFromBatchConsumer()) {
batchConsumerCorrelationKeys.add(key);
batchConsumerCounter.incrementAndGet();
int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
@@ -400,6 +401,50 @@ public class AggregateProcessor extends
});
}
+ /**
+ * Restores the timeout map with timeout values from the aggregation repository.
+ * <p/>
+ * This is needed in case the aggregator has been stopped and started again (for example a server restart).
+ * Then the existing exchanges from the {@link AggregationRepository} must have its timeout conditions restored.
+ */
+ protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
+ StopWatch watch = new StopWatch();
+ LOG.trace("Starting restoring CompletionTimeout for existing exchanges from the aggregation repository...");
+
+ // grab the timeout value for each partly aggregated exchange
+ Set<String> keys = aggregationRepository.getKeys();
+ if (keys == null || keys.isEmpty()) {
+ return;
+ }
+
+ for (String key : keys) {
+ Exchange exchange = aggregationRepository.get(camelContext, key);
+ // grab the timeout value
+ long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0;
+ if (timeout > 0) {
+ LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout);
+ addExchangeToTimeoutMap(key, exchange, timeout);
+ }
+ }
+
+ // log duration of this task so end user can see how long it takes to pre-check this upon starting
+ LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}",
+ timeoutMap.size(), TimeUtils.printDuration(watch.stop()));
+ }
+
+ /**
+ * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts.
+ *
+ * @param key the correlation key
+ * @param exchange the exchange
+ * @param timeout the timeout value in millis
+ */
+ private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) {
+ // store the timeout value on the exchange as well, in case we need it later
+ exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout);
+ timeoutMap.put(key, exchange.getExchangeId(), timeout);
+ }
+
public Predicate getCompletionPredicate() {
return completionPredicate;
}
@@ -797,6 +842,9 @@ public class AggregateProcessor extends
ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
// check for timed out aggregated messages once every second
timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
+ // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we
+ // need to re-establish the timeout map so timeout can trigger
+ restoreTimeoutMapFromAggregationRepository();
ServiceHelper.startService(timeoutMap);
}
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java (from r1130434, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java&r1=1130434&r2=1130521&rev=1130521&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java Thu Jun 2 12:31:42 2011
@@ -18,13 +18,10 @@ package org.apache.camel.processor.aggre
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.camel.CamelExchangeException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
-import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultExchange;
@@ -32,12 +29,13 @@ import org.apache.camel.processor.BodyIn
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.spi.ExceptionHandler;
/**
+ * To test CAMEL-4037 that a restart of aggregator can re-initialize the timeout map
+ *
* @version
*/
-public class AggregateProcessorTest extends ContextTestSupport {
+public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestSupport {
private ExecutorService executorService;
@@ -52,60 +50,18 @@ public class AggregateProcessorTest exte
executorService = Executors.newSingleThreadExecutor();
}
- public void testAggregateProcessorCompletionPredicate() throws Exception {
+ public void testAggregateProcessorTimeoutRestart() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+B+END");
- mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate");
-
- Processor done = new SendProcessor(context.getEndpoint("mock:result"));
- Expression corr = header("id");
- AggregationStrategy as = new BodyInAggregatingStrategy();
- Predicate complete = body().contains("END");
-
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionPredicate(complete);
- ap.setEagerCheckCompletion(false);
- ap.start();
-
- Exchange e1 = new DefaultExchange(context);
- e1.getIn().setBody("A");
- e1.getIn().setHeader("id", 123);
-
- Exchange e2 = new DefaultExchange(context);
- e2.getIn().setBody("B");
- e2.getIn().setHeader("id", 123);
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("END");
- e3.getIn().setHeader("id", 123);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("D");
- e4.getIn().setHeader("id", 123);
-
- ap.process(e1);
- ap.process(e2);
- ap.process(e3);
- ap.process(e4);
-
- assertMockEndpointsSatisfied();
-
- ap.stop();
- }
-
- public void testAggregateProcessorCompletionPredicateEager() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+B+END");
- mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate");
+ mock.expectedBodiesReceived("A+B");
+ mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
Processor done = new SendProcessor(context.getEndpoint("mock:result"));
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- Predicate complete = body().isEqualTo("END");
AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionPredicate(complete);
- ap.setEagerCheckCompletion(true);
+ // start with a high timeout so no completes before we stop
+ ap.setCompletionTimeout(2000);
ap.start();
Exchange e1 = new DefaultExchange(context);
@@ -116,83 +72,29 @@ public class AggregateProcessorTest exte
e2.getIn().setBody("B");
e2.getIn().setHeader("id", 123);
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("END");
- e3.getIn().setHeader("id", 123);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("D");
- e4.getIn().setHeader("id", 123);
-
ap.process(e1);
ap.process(e2);
- ap.process(e3);
- ap.process(e4);
-
- assertMockEndpointsSatisfied();
+ // shutdown before the 2 sec timeout occurs
+ // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository,
ap.stop();
- }
-
- public void testAggregateProcessorCompletionAggregatedSize() throws Exception {
- doTestAggregateProcessorCompletionAggregatedSize(false);
- }
-
- public void testAggregateProcessorCompletionAggregatedSizeEager() throws Exception {
- doTestAggregateProcessorCompletionAggregatedSize(true);
- }
-
- private void doTestAggregateProcessorCompletionAggregatedSize(boolean eager) throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+B+C");
- mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "size");
- Processor done = new SendProcessor(context.getEndpoint("mock:result"));
- Expression corr = header("id");
- AggregationStrategy as = new BodyInAggregatingStrategy();
+ // should be no completed
+ assertEquals(0, mock.getReceivedCounter());
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionSize(3);
- ap.setEagerCheckCompletion(eager);
+ // start aggregator again
ap.start();
- Exchange e1 = new DefaultExchange(context);
- e1.getIn().setBody("A");
- e1.getIn().setHeader("id", 123);
-
- Exchange e2 = new DefaultExchange(context);
- e2.getIn().setBody("B");
- e2.getIn().setHeader("id", 123);
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("C");
- e3.getIn().setHeader("id", 123);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("D");
- e4.getIn().setHeader("id", 123);
-
- ap.process(e1);
- ap.process(e2);
- ap.process(e3);
- ap.process(e4);
-
+ // the aggregator should restore the timeout condition and trigger timeout
assertMockEndpointsSatisfied();
+ assertEquals(1, mock.getReceivedCounter());
- ap.stop();
- }
-
- public void testAggregateProcessorCompletionTimeout() throws Exception {
- doTestAggregateProcessorCompletionTimeout(false);
- }
-
- public void testAggregateProcessorCompletionTimeoutEager() throws Exception {
- doTestAggregateProcessorCompletionTimeout(true);
+ ap.shutdown();
}
- private void doTestAggregateProcessorCompletionTimeout(boolean eager) throws Exception {
+ public void testAggregateProcessorTimeoutExpressionRestart() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+B+C");
+ mock.expectedBodiesReceived("A+B");
mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
Processor done = new SendProcessor(context.getEndpoint("mock:result"));
@@ -200,363 +102,93 @@ public class AggregateProcessorTest exte
AggregationStrategy as = new BodyInAggregatingStrategy();
AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionTimeout(3000);
- ap.setEagerCheckCompletion(eager);
- ap.start();
-
- Exchange e1 = new DefaultExchange(context);
- e1.getIn().setBody("A");
- e1.getIn().setHeader("id", 123);
-
- Exchange e2 = new DefaultExchange(context);
- e2.getIn().setBody("B");
- e2.getIn().setHeader("id", 123);
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("C");
- e3.getIn().setHeader("id", 123);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("D");
- e4.getIn().setHeader("id", 123);
-
- ap.process(e1);
-
- Thread.sleep(250);
- ap.process(e2);
-
- Thread.sleep(500);
- ap.process(e3);
-
- Thread.sleep(5000);
- ap.process(e4);
-
- assertMockEndpointsSatisfied();
-
- ap.stop();
- }
-
- public void testAggregateCompletionInterval() throws Exception {
- // camel context must be started
- context.start();
-
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+B+C", "D");
- mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
-
- Processor done = new SendProcessor(context.getEndpoint("mock:result"));
- Expression corr = header("id");
- AggregationStrategy as = new BodyInAggregatingStrategy();
-
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionInterval(3000);
+ // start with a high timeout so no completes before we stop
+ ap.setCompletionTimeoutExpression(header("myTimeout"));
ap.start();
Exchange e1 = new DefaultExchange(context);
e1.getIn().setBody("A");
e1.getIn().setHeader("id", 123);
+ e1.getIn().setHeader("myTimeout", 2000);
Exchange e2 = new DefaultExchange(context);
e2.getIn().setBody("B");
e2.getIn().setHeader("id", 123);
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("C");
- e3.getIn().setHeader("id", 123);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("D");
- e4.getIn().setHeader("id", 123);
-
- ap.process(e1);
- ap.process(e2);
- ap.process(e3);
-
- Thread.sleep(5000);
- ap.process(e4);
-
- assertMockEndpointsSatisfied();
-
- ap.stop();
- }
-
- public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+C+END");
-
- Processor done = new SendProcessor(context.getEndpoint("mock:result"));
- Expression corr = header("id");
- AggregationStrategy as = new BodyInAggregatingStrategy();
- Predicate complete = body().contains("END");
-
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionPredicate(complete);
- ap.setIgnoreInvalidCorrelationKeys(true);
-
- ap.start();
-
- Exchange e1 = new DefaultExchange(context);
- e1.getIn().setBody("A");
- e1.getIn().setHeader("id", 123);
-
- Exchange e2 = new DefaultExchange(context);
- e2.getIn().setBody("B");
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("C");
- e3.getIn().setHeader("id", 123);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("END");
- e4.getIn().setHeader("id", 123);
+ e2.getIn().setHeader("myTimeout", 2000);
ap.process(e1);
ap.process(e2);
- ap.process(e3);
- ap.process(e4);
-
- assertMockEndpointsSatisfied();
-
- ap.stop();
- }
-
- public void testAggregateBadCorrelationKey() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+C+END");
-
- Processor done = new SendProcessor(context.getEndpoint("mock:result"));
- Expression corr = header("id");
- AggregationStrategy as = new BodyInAggregatingStrategy();
- Predicate complete = body().contains("END");
-
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionPredicate(complete);
-
- ap.start();
-
- Exchange e1 = new DefaultExchange(context);
- e1.getIn().setBody("A");
- e1.getIn().setHeader("id", 123);
-
- Exchange e2 = new DefaultExchange(context);
- e2.getIn().setBody("B");
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("C");
- e3.getIn().setHeader("id", 123);
-
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("END");
- e4.getIn().setHeader("id", 123);
-
- ap.process(e1);
-
- try {
- ap.process(e2);
- fail("Should have thrown an exception");
- } catch (CamelExchangeException e) {
- assertEquals("Invalid correlation key. Exchange[Message: B]", e.getMessage());
- }
-
- ap.process(e3);
- ap.process(e4);
-
- assertMockEndpointsSatisfied();
+ // shutdown before the 2 sec timeout occurs
+ // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository,
ap.stop();
- }
- public void testAggregateCloseCorrelationKeyOnCompletion() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+B+END");
-
- Processor done = new SendProcessor(context.getEndpoint("mock:result"));
- Expression corr = header("id");
- AggregationStrategy as = new BodyInAggregatingStrategy();
- Predicate complete = body().contains("END");
-
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionPredicate(complete);
- ap.setCloseCorrelationKeyOnCompletion(1000);
+ // should be no completed
+ assertEquals(0, mock.getReceivedCounter());
+ // start aggregator again
ap.start();
- Exchange e1 = new DefaultExchange(context);
- e1.getIn().setBody("A");
- e1.getIn().setHeader("id", 123);
-
- Exchange e2 = new DefaultExchange(context);
- e2.getIn().setBody("B");
- e2.getIn().setHeader("id", 123);
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("END");
- e3.getIn().setHeader("id", 123);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("C");
- e4.getIn().setHeader("id", 123);
-
- ap.process(e1);
- ap.process(e2);
- ap.process(e3);
-
- try {
- ap.process(e4);
- fail("Should have thrown an exception");
- } catch (CamelExchangeException e) {
- assertEquals("The correlation key [123] has been closed. Exchange[Message: C]", e.getMessage());
- }
-
+ // the aggregator should restore the timeout condition and trigger timeout
assertMockEndpointsSatisfied();
+ assertEquals(1, mock.getReceivedCounter());
- ap.stop();
+ ap.shutdown();
}
- public void testAggregateUseBatchSizeFromConsumer() throws Exception {
+ public void testAggregateProcessorTwoTimeoutExpressionRestart() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+B", "C+D+E");
- mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "consumer");
+ mock.expectedBodiesReceived("C+D", "A+B");
+ mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
Processor done = new SendProcessor(context.getEndpoint("mock:result"));
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionSize(100);
- ap.setCompletionFromBatchConsumer(true);
-
+ // start with a high timeout so no completes before we stop
+ ap.setCompletionTimeoutExpression(header("myTimeout"));
ap.start();
Exchange e1 = new DefaultExchange(context);
e1.getIn().setBody("A");
e1.getIn().setHeader("id", 123);
- e1.setProperty(Exchange.BATCH_INDEX, 0);
- e1.setProperty(Exchange.BATCH_SIZE, 2);
- e1.setProperty(Exchange.BATCH_COMPLETE, false);
+ e1.getIn().setHeader("myTimeout", 3000);
Exchange e2 = new DefaultExchange(context);
e2.getIn().setBody("B");
e2.getIn().setHeader("id", 123);
- e2.setProperty(Exchange.BATCH_INDEX, 1);
- e2.setProperty(Exchange.BATCH_SIZE, 2);
- e2.setProperty(Exchange.BATCH_COMPLETE, true);
+ e2.getIn().setHeader("myTimeout", 3000);
Exchange e3 = new DefaultExchange(context);
e3.getIn().setBody("C");
- e3.getIn().setHeader("id", 123);
- e3.setProperty(Exchange.BATCH_INDEX, 0);
- e3.setProperty(Exchange.BATCH_SIZE, 3);
- e3.setProperty(Exchange.BATCH_COMPLETE, false);
+ e3.getIn().setHeader("id", 456);
+ e3.getIn().setHeader("myTimeout", 2000);
Exchange e4 = new DefaultExchange(context);
e4.getIn().setBody("D");
- e4.getIn().setHeader("id", 123);
- e4.setProperty(Exchange.BATCH_INDEX, 1);
- e4.setProperty(Exchange.BATCH_SIZE, 3);
- e4.setProperty(Exchange.BATCH_COMPLETE, false);
-
- Exchange e5 = new DefaultExchange(context);
- e5.getIn().setBody("E");
- e5.getIn().setHeader("id", 123);
- e5.setProperty(Exchange.BATCH_INDEX, 2);
- e5.setProperty(Exchange.BATCH_SIZE, 3);
- e5.setProperty(Exchange.BATCH_COMPLETE, true);
+ e4.getIn().setHeader("id", 456);
+ e4.getIn().setHeader("myTimeout", 2000);
ap.process(e1);
ap.process(e2);
ap.process(e3);
ap.process(e4);
- ap.process(e5);
-
- assertMockEndpointsSatisfied();
+ // shutdown before the 2 sec timeout occurs
+ // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository,
ap.stop();
- }
- public void testAggregateLogFailedExchange() throws Exception {
- doTestAggregateLogFailedExchange(null);
- }
-
- public void testAggregateHandleFailedExchange() throws Exception {
- final AtomicBoolean tested = new AtomicBoolean();
-
- ExceptionHandler myHandler = new ExceptionHandler() {
- public void handleException(Throwable exception) {
- }
-
- public void handleException(String message, Throwable exception) {
- }
-
- public void handleException(String message, Exchange exchange, Throwable exception) {
- assertEquals("Error processing aggregated exchange", message);
- assertEquals("B+Kaboom+END", exchange.getIn().getBody());
- assertEquals("Damn", exception.getMessage());
- tested.set(true);
- }
- };
-
- doTestAggregateLogFailedExchange(myHandler);
- assertEquals(true, tested.get());
- }
-
- private void doTestAggregateLogFailedExchange(ExceptionHandler handler) throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+END");
-
- Processor done = new Processor() {
- public void process(Exchange exchange) throws Exception {
- if (exchange.getIn().getBody(String.class).contains("Kaboom")) {
- throw new IllegalArgumentException("Damn");
- }
- // else send it further along
- SendProcessor send = new SendProcessor(context.getEndpoint("mock:result"));
- send.start();
- send.process(exchange);
- }
- };
-
- Expression corr = header("id");
- AggregationStrategy as = new BodyInAggregatingStrategy();
+ // should be no completed
+ assertEquals(0, mock.getReceivedCounter());
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setEagerCheckCompletion(true);
- ap.setCompletionPredicate(body().isEqualTo("END"));
- if (handler != null) {
- ap.setExceptionHandler(handler);
- }
+ // start aggregator again
ap.start();
- Exchange e1 = new DefaultExchange(context);
- e1.getIn().setBody("A");
- e1.getIn().setHeader("id", 123);
-
- Exchange e2 = new DefaultExchange(context);
- e2.getIn().setBody("B");
- e2.getIn().setHeader("id", 456);
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("Kaboom");
- e3.getIn().setHeader("id", 456);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("END");
- e4.getIn().setHeader("id", 456);
-
- Exchange e5 = new DefaultExchange(context);
- e5.getIn().setBody("END");
- e5.getIn().setHeader("id", 123);
-
- ap.process(e1);
- ap.process(e2);
- ap.process(e3);
- ap.process(e4);
- ap.process(e5);
-
+ // the aggregator should restore the timeout condition and trigger timeout
assertMockEndpointsSatisfied();
+ assertEquals(2, mock.getReceivedCounter());
- ap.stop();
+ ap.shutdown();
}
-
}
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java?rev=1130521&r1=1130520&r2=1130521&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java Thu Jun 2 12:31:42 2011
@@ -17,7 +17,6 @@
package org.apache.camel.component.hawtdb;
import java.io.IOException;
-import java.io.Serializable;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
@@ -55,8 +54,9 @@ public final class HawtDBCamelCodec {
DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
// use DefaultExchangeHolder to marshal to a serialized object
DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
- // add the aggregated size property as the only property we want to retain
+ // add the aggregated size and timeout property as the only properties we want to retain
DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
+ DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_TIMEOUT, exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, Long.class));
// add the aggregated completed by property to retain
DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
// add the aggregated correlation key property to retain
Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java (from r1130434, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java&r1=1130434&r2=1130521&rev=1130521&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java Thu Jun 2 12:31:42 2011
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.hawtdb;
-import java.util.concurrent.TimeUnit;
-
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -25,7 +23,7 @@ import org.apache.camel.processor.aggreg
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
-public class HawtDBAggregateTest extends CamelTestSupport {
+public class HawtDBAggregateTimeoutCompletionRestartTest extends CamelTestSupport {
@Override
public void setUp() throws Exception {
@@ -34,27 +32,32 @@ public class HawtDBAggregateTest extends
}
@Test
- public void testHawtDBAggregate() throws Exception {
+ public void testHawtDBAggregateTimeoutCompletionRestart() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:aggregated");
- mock.expectedBodiesReceived("ABCDE");
+ mock.expectedMessageCount(0);
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
template.sendBodyAndHeader("direct:start", "C", "id", 123);
- template.sendBodyAndHeader("direct:start", "D", "id", 123);
- template.sendBodyAndHeader("direct:start", "E", "id", 123);
- assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ // stop Camel
+ context.stop();
+ assertEquals(0, mock.getReceivedCounter());
+
+ // start Camel again, and the timeout should trigger a completion
+ context.start();
+
+ mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABC");
- // from endpoint should be preserved
- assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+ assertMockEndpointsSatisfied();
+ assertEquals(1, mock.getReceivedCounter());
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
- // START SNIPPET: e1
public void configure() throws Exception {
// create the hawtdb repo
HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");
@@ -63,10 +66,9 @@ public class HawtDBAggregateTest extends
from("direct:start")
.aggregate(header("id"), new MyAggregationStrategy())
// use our created hawtdb repo as aggregation repository
- .completionSize(5).aggregationRepository(repo)
+ .completionTimeout(3000).aggregationRepository(repo)
.to("mock:aggregated");
}
- // END SNIPPET: e1
};
}
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java?rev=1130521&r1=1130520&r2=1130521&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java Thu Jun 2 12:31:42 2011
@@ -36,8 +36,9 @@ public final class JdbcCamelCodec {
public byte[] marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
// use DefaultExchangeHolder to marshal to a serialized object
DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
- // add the aggregated size property as the only property we want to retain
+ // add the aggregated size and timeout property as the only properties we want to retain
DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
+ DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_TIMEOUT, exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, Long.class));
// add the aggregated completed by property to retain
DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
// add the aggregated correlation key property to retain
Copied: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java (from r1130501, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java&r1=1130501&r2=1130521&rev=1130521&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java Thu Jun 2 12:31:42 2011
@@ -16,29 +16,33 @@
*/
package org.apache.camel.processor.aggregate.jdbc;
-import java.util.concurrent.TimeUnit;
-
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
-public class JdbcAggregateTest extends AbstractJdbcAggregationTestSupport {
+public class JdbcAggregateTimeoutCompletionRestartTest extends AbstractJdbcAggregationTestSupport {
@Test
- public void testJdbcAggregate() throws Exception {
+ public void testJdbcAggregateTimeoutCompletionRestart() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:aggregated");
- mock.expectedBodiesReceived("ABCDE");
+ mock.expectedMessageCount(0);
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
template.sendBodyAndHeader("direct:start", "C", "id", 123);
- template.sendBodyAndHeader("direct:start", "D", "id", 123);
- template.sendBodyAndHeader("direct:start", "E", "id", 123);
- assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ // stop Camel
+ context.stop();
+ assertEquals(0, mock.getReceivedCounter());
+
+ // start Camel again, and the timeout should trigger a completion
+ context.start();
+
+ mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABC");
- // from endpoint should be preserved
- assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+ assertMockEndpointsSatisfied();
+ assertEquals(1, mock.getReceivedCounter());
}
@Override
@@ -50,7 +54,7 @@ public class JdbcAggregateTest extends A
from("direct:start")
.aggregate(header("id"), new MyAggregationStrategy())
// use our created jdbc repo as aggregation repository
- .completionSize(5).aggregationRepository(repo)
+ .completionTimeout(3000).aggregationRepository(repo)
.to("mock:aggregated");
}
};