You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/28 08:43:43 UTC
svn commit: r1002036 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java
Author: davsclaus
Date: Tue Sep 28 06:43:42 2010
New Revision: 1002036
URL: http://svn.apache.org/viewvc?rev=1002036&view=rev
Log:
CAMEL-3159: discard on timeout in Aggregate EIP. Added test with hawtdb as well.
Added:
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java
- copied, changed from r1002017, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1002036&r1=1002035&r2=1002036&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Tue Sep 28 06:43:42 2010
@@ -365,6 +365,10 @@ public class AggregateProcessor extends
if (LOG.isDebugEnabled()) {
LOG.debug("Aggregation for correlation key " + key + " discarding aggregated exchange: " + exchange);
}
+ // must confirm the discarded exchange
+ aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
+ // and remove redelivery state as well
+ redeliveryState.remove(exchange.getExchangeId());
return;
}
Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java (from r1002017, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java&r1=1002017&r2=1002036&rev=1002036&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java Tue Sep 28 06:43:42 2010
@@ -20,13 +20,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
-public class HawtDBAggregateRecoverTest extends CamelTestSupport {
+public class HawtDBAggregateDiscardOnTimeoutTest extends CamelTestSupport {
private static AtomicInteger counter = new AtomicInteger(0);
private HawtDBAggregationRepository repo;
@@ -43,22 +43,28 @@ public class HawtDBAggregateRecoverTest
}
@Test
- public void testHawtDBAggregateRecover() throws Exception {
- // should fail the first 2 times and then recover
- getMockEndpoint("mock:aggregated").expectedMessageCount(3);
- getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
- // should be marked as redelivered
- getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
- // on the 2nd redelivery attempt we success
- getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+ public void testAggregateDiscardOnTimeout() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+ // wait 3 seconds
+ Thread.sleep(3000);
+
+ mock.assertIsSatisfied();
+
+ // now send 3 which does not timeout
+ mock.reset();
+ mock.expectedBodiesReceived("C+D+E");
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
template.sendBodyAndHeader("direct:start", "C", "id", 123);
- template.sendBodyAndHeader("direct:start", "D", "id", 123);
- template.sendBodyAndHeader("direct:start", "E", "id", 123);
- assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ // should complete before timeout
+ mock.await(1500, TimeUnit.MILLISECONDS);
}
@Override
@@ -68,21 +74,12 @@ public class HawtDBAggregateRecoverTest
public void configure() throws Exception {
from("direct:start")
.aggregate(header("id"), new MyAggregationStrategy())
- .completionSize(5).aggregationRepository(repo)
- .log("aggregated exchange id ${exchangeId} with ${body}")
- .to("mock:aggregated")
- .delay(1000)
- // simulate errors the first two times
- .process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- int count = counter.incrementAndGet();
- if (count <= 2) {
- throw new IllegalArgumentException("Damn");
- }
- }
- })
- .to("mock:result")
- .end();
+ .completionSize(3).aggregationRepository(repo)
+ // use a 3 second timeout
+ .completionTimeout(2000)
+ // and if timeout occurred then just discard the aggregated message
+ .discardOnCompletionTimeout()
+ .to("mock:aggregated");
}
};
}