You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/24 10:49:43 UTC
svn commit: r1052470 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/main/java/org/apache/camel/processor/l...
Author: davsclaus
Date: Fri Dec 24 09:49:43 2010
New Revision: 1052470
URL: http://svn.apache.org/viewvc?rev=1052470&view=rev
Log:
CAMEL-3460: Added RedeliveryMaxCounter header to message when Camel performs redelivery, so end user can know when its the last redelivery attempt.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Fri Dec 24 09:49:43 2010
@@ -143,12 +143,13 @@ public interface Exchange {
String ON_COMPLETION = "CamelOnCompletion";
- String REDELIVERED = "CamelRedelivered";
- String REDELIVERY_COUNTER = "CamelRedeliveryCounter";
- String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted";
- String ROLLBACK_ONLY = "CamelRollbackOnly";
- String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast";
- String ROUTE_STOP = "CamelRouteStop";
+ String REDELIVERED = "CamelRedelivered";
+ String REDELIVERY_COUNTER = "CamelRedeliveryCounter";
+ String REDELIVERY_MAX_COUNTER = "CamelRedeliveryMaxCounter";
+ String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted";
+ String ROLLBACK_ONLY = "CamelRollbackOnly";
+ String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast";
+ String ROUTE_STOP = "CamelRouteStop";
String SOAP_ACTION = "CamelSoapAction";
String SKIP_GZIP_ENCODING = "CamelSkipGzipEncoding";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Fri Dec 24 09:49:43 2010
@@ -480,6 +480,7 @@ public abstract class RedeliveryErrorHan
// its continued then remove traces of redelivery attempted and caught exception
exchange.getIn().removeHeader(Exchange.REDELIVERED);
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
+ exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
// keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
// create log message
@@ -534,7 +535,7 @@ public abstract class RedeliveryErrorHan
+ ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
logFailedDelivery(true, false, false, exchange, msg, data, e);
- data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+ data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
}
/**
@@ -579,6 +580,7 @@ public abstract class RedeliveryErrorHan
// its handled then remove traces of redelivery attempted
exchange.getIn().removeHeader(Exchange.REDELIVERED);
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
+ exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
handled = true;
} else {
// must decrement the redelivery counter as we didn't process the redelivery but is
@@ -806,7 +808,7 @@ public abstract class RedeliveryErrorHan
* Increments the redelivery counter and adds the redelivered flag if the
* message has been redelivered
*/
- private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
+ private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
Message in = exchange.getIn();
Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
int next = 1;
@@ -815,6 +817,10 @@ public abstract class RedeliveryErrorHan
}
in.setHeader(Exchange.REDELIVERY_COUNTER, next);
in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+ // if maximum redeliveries is used, then provide that information as well
+ if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
+ in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
+ }
return next;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Dec 24 09:49:43 2010
@@ -738,6 +738,9 @@ public class AggregateProcessor extends
// set redelivery counter
exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+ if (recoverable.getMaximumRedeliveries() > 0) {
+ exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Delivery attempt: " + data.redeliveryCounter + " to recover aggregated exchange with id: " + exchangeId + "");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Fri Dec 24 09:49:43 2010
@@ -209,6 +209,7 @@ public class FailOverLoadBalancer extend
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, null);
exchange.getIn().removeHeader(Exchange.REDELIVERED);
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
+ exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
}
private boolean processExchange(Processor processor, Exchange exchange,
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java Fri Dec 24 09:49:43 2010
@@ -64,8 +64,8 @@ public class DeadLetterChannelOnRedelive
// MyRedeliveryProcessor before a redelivery is
// attempted. This allows us to alter the message before
errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(5)
- .onRedelivery(new MyRedeliverPrcessor())
- // setting delay to zero is just to make unit teting faster
+ .onRedelivery(new MyRedeliverProcessor())
+ // setting delay to zero is just to make unit testing faster
.redeliveryDelay(0L));
// END SNIPPET: e1
@@ -86,7 +86,7 @@ public class DeadLetterChannelOnRedelive
// START SNIPPET: e2
// This is our processor that is executed before every redelivery attempt
// here we can do what we want in the java code, such as altering the message
- public class MyRedeliverPrcessor implements Processor {
+ public class MyRedeliverProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// the message is being redelivered so we can alter it
@@ -94,9 +94,13 @@ public class DeadLetterChannelOnRedelive
// we just append the redelivery counter to the body
// you can of course do all kind of stuff instead
String body = exchange.getIn().getBody(String.class);
- int count = exchange.getIn().getHeader("CamelRedeliveryCounter", Integer.class);
+ int count = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
exchange.getIn().setBody(body + count);
+
+ // the maximum redelivery was set to 5
+ int max = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
+ assertEquals(5, max);
}
}
// END SNIPPET: e2
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Fri Dec 24 09:49:43 2010
@@ -39,6 +39,7 @@ public class DeadLetterChannelTest exten
successEndpoint.expectedBodiesReceived(body);
successEndpoint.message(0).header(Exchange.REDELIVERED).isEqualTo(true);
successEndpoint.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(1);
+ successEndpoint.message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(2);
deadEndpoint.expectedMessageCount(0);
@@ -54,6 +55,7 @@ public class DeadLetterChannelTest exten
// no traces of redelivery as the dead letter channel will handle the exception when moving the DLQ
deadEndpoint.message(0).header(Exchange.REDELIVERED).isNull();
deadEndpoint.message(0).header(Exchange.REDELIVERY_COUNTER).isNull();
+ deadEndpoint.message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
successEndpoint.expectedMessageCount(0);
sendBody("direct:start", body);
@@ -77,6 +79,7 @@ public class DeadLetterChannelTest exten
// no traces of redelivery as the dead letter channel will handle the exception when moving the DLQ
deadEndpoint.message(0).header(Exchange.REDELIVERED).isNull();
deadEndpoint.message(0).header(Exchange.REDELIVERY_COUNTER).isNull();
+ deadEndpoint.message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
successEndpoint.expectedMessageCount(0);
template.requestBody("direct:start", body);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java Fri Dec 24 09:49:43 2010
@@ -46,6 +46,7 @@ public class RedeliveryPolicyPerExceptio
log.info("Found message with headers: " + in.getHeaders());
assertMessageHeader(in, Exchange.REDELIVERY_COUNTER, 2);
+ assertMessageHeader(in, Exchange.REDELIVERY_MAX_COUNTER, 2);
assertMessageHeader(in, Exchange.REDELIVERED, true);
}
@@ -63,6 +64,7 @@ public class RedeliveryPolicyPerExceptio
log.info("Found message with headers: " + in.getHeaders());
assertMessageHeader(in, Exchange.REDELIVERY_COUNTER, 0);
+ assertMessageHeader(in, Exchange.REDELIVERY_MAX_COUNTER, null);
assertMessageHeader(in, Exchange.REDELIVERED, false);
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java Fri Dec 24 09:49:43 2010
@@ -60,6 +60,7 @@ public class AsyncDeadLetterChannelTest
mock.expectedMessageCount(1);
mock.message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
mock.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+ mock.message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(2);
try {
template.requestBody("direct:in", "Hello World");
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java Fri Dec 24 09:49:43 2010
@@ -48,10 +48,23 @@ public class HawtDBAggregateRecoverDeadL
public void testHawtDBAggregateRecoverDeadLetterChannel() throws Exception {
// should fail all times
getMockEndpoint("mock:result").expectedMessageCount(0);
+
getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+ getMockEndpoint("mock:aggregated").message(0).header(Exchange.REDELIVERED).isNull();
+ getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(1);
+ getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+ getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+
getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java Fri Dec 24 09:49:43 2010
@@ -54,6 +54,7 @@ public class HawtDBAggregateRecoverWithR
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
// on the 2nd redelivery attempt we success
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);