You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/11/27 11:34:27 UTC
svn commit: r721149 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/onexception/
Author: davsclaus
Date: Thu Nov 27 02:34:26 2008
New Revision: 721149
URL: http://svn.apache.org/viewvc?rev=721149&view=rev
Log:
CAMEL-102: Added retryUntil predicate to onException so end user can have fine grained control when retry should stop.
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java (contents, props changed)
- copied, changed from r720962, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionHandleAndTransformTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java?rev=721149&r1=721148&r2=721149&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java Thu Nov 27 02:34:26 2008
@@ -55,6 +55,8 @@
private List<String> exceptions = new ArrayList<String>();
@XmlElement(name = "onWhen", required = false)
private WhenType onWhen;
+ @XmlElement(name = "retryUntil", required = false)
+ private ExpressionSubElementType retryUntil;
@XmlElement(name = "redeliveryPolicy", required = false)
private RedeliveryPolicyType redeliveryPolicy;
@XmlElement(name = "handled", required = false)
@@ -67,6 +69,8 @@
private Processor errorHandler;
@XmlTransient
private Predicate handledPolicy;
+ @XmlTransient
+ private Predicate retryUntilPolicy;
public ExceptionType() {
}
@@ -106,6 +110,7 @@
public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
setHandledFromExpressionType(routeContext);
+ setRetryUntilFromExpressionType(routeContext);
// lets attach a processor to an error handler
errorHandler = routeContext.createProcessor(this);
ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder();
@@ -192,6 +197,28 @@
}
/**
+ * Sets the retry until predicate.
+ *
+ * @param until predicate that determines when to stop retrying
+ * @return the builder
+ */
+ public ExceptionType retryUntil(Predicate until) {
+ setRetryUntilPolicy(until);
+ return this;
+ }
+
+ /**
+ * Sets the retry until expression.
+ *
+ * @param until expression that determines when to stop retrying
+ * @return the builder
+ */
+ public ExceptionType retryUntil(Expression until) {
+ setRetryUntilPolicy(toPredicate(until));
+ return this;
+ }
+
+ /**
* Sets the back off multiplier
*
* @param backOffMultiplier the back off multiplier
@@ -356,12 +383,6 @@
public ExpressionSubElementType getHandled() {
return handled;
}
-
- private void setHandledFromExpressionType(RouteContext routeContext) {
- if (getHandled() != null && handledPolicy == null && routeContext != null) {
- handled(getHandled().createPredicate(routeContext));
- }
- }
public void setHandledPolicy(Predicate handledPolicy) {
this.handledPolicy = handledPolicy;
@@ -375,6 +396,22 @@
this.onWhen = onWhen;
}
+ public ExpressionSubElementType getRetryUntil() {
+ return retryUntil;
+ }
+
+ public void setRetryUntil(ExpressionSubElementType retryUntil) {
+ this.retryUntil = retryUntil;
+ }
+
+ public Predicate getRetryUntilPolicy() {
+ return retryUntilPolicy;
+ }
+
+ public void setRetryUntilPolicy(Predicate retryUntilPolicy) {
+ this.retryUntilPolicy = retryUntilPolicy;
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
protected RedeliveryPolicyType getOrCreateRedeliveryPolicy() {
@@ -393,4 +430,18 @@
}
return answer;
}
+
+
+ private void setHandledFromExpressionType(RouteContext routeContext) {
+ if (getHandled() != null && handledPolicy == null && routeContext != null) {
+ handled(getHandled().createPredicate(routeContext));
+ }
+ }
+
+ private void setRetryUntilFromExpressionType(RouteContext routeContext) {
+ if (getRetryUntil() != null && retryUntilPolicy == null && routeContext != null) {
+ retryUntil(getRetryUntil().createPredicate(routeContext));
+ }
+ }
+
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=721149&r1=721148&r2=721149&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Thu Nov 27 02:34:26 2008
@@ -61,6 +61,7 @@
long redeliveryDelay;
boolean sync = true;
Predicate handledPredicate;
+ Predicate retryUntilPredicate;
// default behavior which can be overloaded on a per exception basis
RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
@@ -91,11 +92,18 @@
return "DeadLetterChannel[" + output + ", " + deadLetter + "]";
}
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
public boolean process(Exchange exchange, final AsyncCallback callback) {
return process(exchange, callback, new RedeliveryData());
}
- public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+ /**
+ * Processes the exchange using decorated with this dead letter channel.
+ */
+ protected boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
while (true) {
// we can't keep retrying if the route is being shutdown.
@@ -127,18 +135,21 @@
if (exceptionPolicy != null) {
data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
data.handledPredicate = exceptionPolicy.getHandledPolicy();
+ data.retryUntilPredicate = exceptionPolicy.getRetryUntilPolicy();
Processor processor = exceptionPolicy.getErrorHandler();
if (processor != null) {
data.failureProcessor = processor;
}
}
- logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
+ logFailedDelivery(true, exchange, "Failed delivery for exchangeId: " + exchange.getExchangeId()
+ + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
}
- // should we redeliver or not?
- if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+ // compute if we should redeliver or not
+ boolean shouldRedeliver = shouldRedeliver(exchange, data);
+ if (!shouldRedeliver) {
// we did not success with the redelivery so now we let the failure processor handle it
setFailureHandled(exchange);
// must decrement the redelivery counter as we didn't process the redelivery but is
@@ -155,11 +166,12 @@
// The line below shouldn't be needed, it is invoked by the AsyncCallback above
//restoreExceptionOnExchange(exchange, data.handledPredicate);
- logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor, data, null);
+ logFailedDelivery(false, exchange, "Failed delivery for exchangeId: " + exchange.getExchangeId()
+ + ". Handled by the failure processor: " + data.failureProcessor, data, null);
return sync;
}
- // should we redeliver
+ // if we are redelivering then sleep before trying again
if (data.redeliveryCounter > 0) {
// okay we will give it another go so clear the exception so we can try again
if (exchange.getException() != null) {
@@ -201,53 +213,20 @@
}
- private void logFailedDelivery(String message, RedeliveryData data, Throwable e) {
- LoggingLevel newLogLevel = null;
- if (data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
- newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
- } else {
- newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
- }
- if (e != null) {
- logger.log(message, e, newLogLevel);
- } else {
- logger.log(message, newLogLevel);
- }
- }
+ // Properties
+ // -------------------------------------------------------------------------
public static boolean isFailureHandled(Exchange exchange) {
- return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
+ return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
|| exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null;
}
public static void setFailureHandled(Exchange exchange) {
exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
- exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, exchange.getException());
+ exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, exchange.getException());
exchange.setException(null);
}
- protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) {
- if (handledPredicate == null || !handledPredicate.matches(exchange)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("This exchange is not handled so its marked as failed: " + exchange);
- }
- // exception not handled, put exception back in the exchange
- exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("This exchange is handled so its marked as not failed: " + exchange);
- }
- exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE);
- }
- }
-
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- // Properties
- // -------------------------------------------------------------------------
-
/**
* Returns the output processor
*/
@@ -287,8 +266,42 @@
}
// Implementation methods
+
// -------------------------------------------------------------------------
+ protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) {
+ if (handledPredicate == null || !handledPredicate.matches(exchange)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("This exchange is not handled so its marked as failed: " + exchange);
+ }
+ // exception not handled, put exception back in the exchange
+ exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("This exchange is handled so its marked as not failed: " + exchange);
+ }
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE);
+ }
+ }
+
+ private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) {
+ LoggingLevel newLogLevel;
+ if (shouldRedeliver) {
+ newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
+ } else {
+ newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+ }
+ if (e != null) {
+ logger.log(message, e, newLogLevel);
+ } else {
+ logger.log(message, newLogLevel);
+ }
+ }
+
+ private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
+ return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryUntilPredicate);
+ }
+
/**
* Increments the redelivery counter and adds the redelivered flag if the
* message has been redelivered
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=721149&r1=721148&r2=721149&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Thu Nov 27 02:34:26 2008
@@ -16,9 +16,10 @@
*/
package org.apache.camel.processor;
-import java.io.Serializable;
import java.util.Random;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
import org.apache.camel.model.LoggingLevel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -89,10 +90,21 @@
/**
* Returns true if the policy decides that the message exchange should be
- * redelivered
- */
- public boolean shouldRedeliver(int redeliveryCounter) {
+ * redelivered.
+ *
+ * @param exchange the current exchange
+ * @param redeliveryCounter the current retry counter
+ * @param retryUntil an optional predicate to determine if we should redeliver or not
+ * @return true to redeliver, false to stop
+ */
+ public boolean shouldRedeliver(Exchange exchange, int redeliveryCounter, Predicate retryUntil) {
+ // predicate is always used if provided
+ if (retryUntil != null) {
+ return retryUntil.matches(exchange);
+ }
+
if (getMaximumRedeliveries() < 0) {
+ // retry forever if negative value
return true;
}
// redeliver until we hit the max
Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java (from r720962, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionHandleAndTransformTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionHandleAndTransformTest.java&r1=720962&r2=721149&rev=721149&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionHandleAndTransformTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java Thu Nov 27 02:34:26 2008
@@ -16,26 +16,41 @@
*/
package org.apache.camel.processor.onexception;
+import org.apache.camel.Body;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeException;
+import org.apache.camel.Header;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.processor.DeadLetterChannel;
/**
- * Unit test inspired by end user
+ * Unit test for the retry until predicate
*/
-public class OnExceptionHandleAndTransformTest extends ContextTestSupport {
+public class OnExceptionRetryUntilTest extends ContextTestSupport {
- public void testOnExceptionTransformConstant() throws Exception {
+ private static int invoked;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myRetryHandler", new MyRetryBean());
+ return jndi;
+ }
+
+ public void testRetryUntil() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0));
// START SNIPPET: e1
- // we catch MyFunctionalException and want to mark it as handled (= no failure returned to client)
- // but we want to return a fixed text response, so we transform OUT body as Sorry.
+ // we want to use a predicate for retries so we can determine in our bean
+ // when retry should stop
onException(MyFunctionalException.class)
+ .retryUntil(bean("myRetryHandler"))
.handled(true)
.transform().constant("Sorry");
// END SNIPPET: e1
@@ -50,59 +65,24 @@
Object out = template.requestBody("direct:start", "Hello World");
assertEquals("Sorry", out);
+ assertEquals(3, invoked);
}
- public void testOnExceptionTransformExceptionMessage() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0));
-
- // START SNIPPET: e2
- // we catch MyFunctionalException and want to mark it as handled (= no failure returned to client)
- // but we want to return a fixed text response, so we transform OUT body and return the exception message
- onException(MyFunctionalException.class)
- .handled(true)
- .transform(exceptionMessage());
- // END SNIPPET: e2
-
- from("direct:start").process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- throw new MyFunctionalException("Sorry you can not do this again to me");
- }
- });
- }
- });
-
- Object out = template.requestBody("direct:start", "Hello World");
- assertEquals("Sorry you can not do this again to me", out);
- }
-
- public void testOnExceptionSimpleLangaugeExceptionMessage() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0));
-
- // START SNIPPET: e3
- // we catch MyFunctionalException and want to mark it as handled (= no failure returned to client)
- // but we want to return a fixed text response, so we transform OUT body and return a nice message
- // using the simple language where we want insert the exception message
- onException(MyFunctionalException.class)
- .handled(true)
- .transform().simple("Error reported: ${exception.message} - can not process this message.");
- // END SNIPPET: e3
-
- from("direct:start").process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- throw new MyFunctionalException("Out of order");
- }
- });
- }
- });
+ // START SNIPPET: e2
+ public class MyRetryBean {
- Object out = template.requestBody("direct:start", "Hello World");
- assertEquals("Error reported: Out of order - can not process this message.", out);
+ // using bean binding we can bind the information from the exchange to the types we have in our method signature
+ public boolean retryUntil(@Header(name = DeadLetterChannel.REDELIVERY_COUNTER) Integer counter, @Body String body, @ExchangeException Exception causedBy) {
+ // NOTE: counter is the redelivery attempt, will start from 1
+ invoked++;
+
+ assertEquals("Hello World", body);
+ assertTrue(causedBy instanceof MyFunctionalException);
+
+ // we can of course do what ever we want to determine the result but this is a unit test so we end after 3 attempts
+ return counter < 3;
+ }
}
+ // END SNIPPET: e2
-}
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
------------------------------------------------------------------------------
svn:mergeinfo =