You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/10 19:00:00 UTC
svn commit: r932763 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/processor/aggregate/
components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/
Author: davsclaus
Date: Sat Apr 10 16:59:59 2010
New Revision: 932763
URL: http://svn.apache.org/viewvc?rev=932763&view=rev
Log:
CAMEL-2568: RecoverableAggregationRepository now has max redeliveries and dead letter channel.
Added:
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java
- copied, changed from r932757, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=932763&r1=932762&r2=932763&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sat Apr 10 16:59:59 2010
@@ -589,30 +589,42 @@ public class AggregateProcessor extends
// and mark it as redelivered
exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
- // update current redelivery state
+ // get the current redelivery data
RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
- if (data == null) {
- // create new data
- data = new RedeliveryData();
- redeliveryState.put(exchange.getExchangeId(), data);
- }
- data.redeliveryCounter++;
-
- // set redelivery counter
- exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
// if we are exhausted, then move to dead letter channel
- if (recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter > recoverable.getMaximumRedeliveries()) {
+ if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
+ " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
+
+ // send to DLC
try {
+ // set redelivery counter
+ exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
deadLetterProcessor.process(exchange);
- // confirm after it has been moved to dead letter channel
- recoverable.confirm(camelContext, exchangeId);
} catch (Exception e) {
- getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), e);
+ exchange.setException(e);
+ }
+
+ // handle if failed
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
+ } else {
+ // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
+ recoverable.confirm(camelContext, exchangeId);
}
} else {
+ // update current redelivery state
+ if (data == null) {
+ // create new data
+ data = new RedeliveryData();
+ redeliveryState.put(exchange.getExchangeId(), data);
+ }
+ data.redeliveryCounter++;
+
+ // set redelivery counter
+ exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+
if (LOG.isDebugEnabled()) {
LOG.debug("Delivery attempt: " + data.redeliveryCounter + " to recover aggregated exchange with id: " + exchangeId + "");
}
Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java (from r932757, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java&r1=932757&r2=932763&rev=932763&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java Sat Apr 10 16:59:59 2010
@@ -24,7 +24,7 @@ import org.apache.camel.processor.aggreg
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
-public class HawtDBAggregateRecoverDeadLetterChannelTest extends CamelTestSupport {
+public class HawtDBAggregateRecoverDeadLetterChannelFailedTest extends CamelTestSupport {
private HawtDBAggregationRepository<String> repo;
@@ -37,7 +37,7 @@ public class HawtDBAggregateRecoverDeadL
// exhaust after at most 3 attempts
repo.setMaximumRedeliveries(3);
// and move to this dead letter channel
- repo.setDeadLetterUri("mock:dead");
+ repo.setDeadLetterUri("direct:dead");
// check faster
repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
@@ -45,13 +45,19 @@ public class HawtDBAggregateRecoverDeadL
}
@Test
- public void testHawtDBAggregateRecoverDeadLetterChannel() throws Exception {
+ public void testHawtDBAggregateRecoverDeadLetterChannelFailed() throws Exception {
// should fail all times
getMockEndpoint("mock:result").expectedMessageCount(0);
getMockEndpoint("mock:aggregated").expectedMessageCount(4);
- getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
+ // it should keep sending to DLC if it failed, so test for min 3 messages
+ getMockEndpoint("mock:dead").expectedMinimumMessageCount(3);
+ // all the details should be the same about redelivered and redelivered 3 times
getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
- getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(4);
+ getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:dead").message(2).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:dead").message(2).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
@@ -75,6 +81,10 @@ public class HawtDBAggregateRecoverDeadL
.throwException(new IllegalArgumentException("Damn"))
.to("mock:result")
.end();
+
+ from("direct:dead")
+ .to("mock:dead")
+ .throwException(new IllegalArgumentException("We are dead"));
}
};
}
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java?rev=932763&r1=932762&r2=932763&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java Sat Apr 10 16:59:59 2010
@@ -51,7 +51,7 @@ public class HawtDBAggregateRecoverDeadL
getMockEndpoint("mock:aggregated").expectedMessageCount(4);
getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
- getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(4);
+ getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);