You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/11/27 11:34:27 UTC

svn commit: r721149 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/onexception/

Author: davsclaus
Date: Thu Nov 27 02:34:26 2008
New Revision: 721149

URL: http://svn.apache.org/viewvc?rev=721149&view=rev
Log:
CAMEL-102: Added retryUntil predicate to onException so end user can have fine grained control when retry should stop.

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java   (contents, props changed)
      - copied, changed from r720962, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionHandleAndTransformTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java?rev=721149&r1=721148&r2=721149&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java Thu Nov 27 02:34:26 2008
@@ -55,6 +55,8 @@
     private List<String> exceptions = new ArrayList<String>();
     @XmlElement(name = "onWhen", required = false)
     private WhenType onWhen;
+    @XmlElement(name = "retryUntil", required = false)
+    private ExpressionSubElementType retryUntil;
     @XmlElement(name = "redeliveryPolicy", required = false)
     private RedeliveryPolicyType redeliveryPolicy;
     @XmlElement(name = "handled", required = false)
@@ -67,6 +69,8 @@
     private Processor errorHandler;
     @XmlTransient
     private Predicate handledPolicy;
+    @XmlTransient
+    private Predicate retryUntilPolicy;
 
     public ExceptionType() {
     }
@@ -106,6 +110,7 @@
 
     public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
         setHandledFromExpressionType(routeContext);
+        setRetryUntilFromExpressionType(routeContext);
         // lets attach a processor to an error handler
         errorHandler = routeContext.createProcessor(this);
         ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder();
@@ -192,6 +197,28 @@
     }
 
     /**
+     * Sets the retry until predicate.
+     *
+     * @param until predicate that determines when to stop retrying
+     * @return the builder
+     */
+    public ExceptionType retryUntil(Predicate until) {
+        setRetryUntilPolicy(until);
+        return this;
+    }
+
+    /**
+     * Sets the retry until expression.
+     *
+     * @param until expression that determines when to stop retrying
+     * @return the builder
+     */
+    public ExceptionType retryUntil(Expression until) {
+        setRetryUntilPolicy(toPredicate(until));
+        return this;
+    }
+
+    /**
      * Sets the back off multiplier
      *
      * @param backOffMultiplier  the back off multiplier
@@ -356,12 +383,6 @@
     public ExpressionSubElementType getHandled() {
         return handled;
     }    
-    
-    private void setHandledFromExpressionType(RouteContext routeContext) {
-        if (getHandled() != null && handledPolicy == null && routeContext != null) {  
-            handled(getHandled().createPredicate(routeContext));
-        }
-    }
 
     public void setHandledPolicy(Predicate handledPolicy) {
         this.handledPolicy = handledPolicy;
@@ -375,6 +396,22 @@
         this.onWhen = onWhen;
     }
 
+    public ExpressionSubElementType getRetryUntil() {
+        return retryUntil;
+    }
+
+    public void setRetryUntil(ExpressionSubElementType retryUntil) {
+        this.retryUntil = retryUntil;
+    }
+
+    public Predicate getRetryUntilPolicy() {
+        return retryUntilPolicy;
+    }
+
+    public void setRetryUntilPolicy(Predicate retryUntilPolicy) {
+        this.retryUntilPolicy = retryUntilPolicy;
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
     protected RedeliveryPolicyType getOrCreateRedeliveryPolicy() {
@@ -393,4 +430,18 @@
         }
         return answer;
     }
+
+
+    private void setHandledFromExpressionType(RouteContext routeContext) {
+        if (getHandled() != null && handledPolicy == null && routeContext != null) {
+            handled(getHandled().createPredicate(routeContext));
+        }
+    }
+
+    private void setRetryUntilFromExpressionType(RouteContext routeContext) {
+        if (getRetryUntil() != null && retryUntilPolicy == null && routeContext != null) {
+            retryUntil(getRetryUntil().createPredicate(routeContext));
+        }
+    }
+
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=721149&r1=721148&r2=721149&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Thu Nov 27 02:34:26 2008
@@ -61,6 +61,7 @@
         long redeliveryDelay;
         boolean sync = true;
         Predicate handledPredicate;
+        Predicate retryUntilPredicate;
 
         // default behavior which can be overloaded on a per exception basis
         RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
@@ -91,11 +92,18 @@
         return "DeadLetterChannel[" + output + ", " + deadLetter + "]";
     }
 
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
     public boolean process(Exchange exchange, final AsyncCallback callback) {
         return process(exchange, callback, new RedeliveryData());
     }
 
-    public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+    /**
+     * Processes the exchange using decorated with this dead letter channel.
+     */
+    protected boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
 
         while (true) {
             // we can't keep retrying if the route is being shutdown.
@@ -127,18 +135,21 @@
                 if (exceptionPolicy != null) {
                     data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
                     data.handledPredicate = exceptionPolicy.getHandledPolicy();
+                    data.retryUntilPredicate = exceptionPolicy.getRetryUntilPolicy();
                     Processor processor = exceptionPolicy.getErrorHandler();
                     if (processor != null) {
                         data.failureProcessor = processor;
                     }                    
                 }
                 
-                logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
+                logFailedDelivery(true, exchange, "Failed delivery for exchangeId: " + exchange.getExchangeId()
+                        + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
                 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
             }
 
-            // should we redeliver or not?
-            if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+            // compute if we should redeliver or not
+            boolean shouldRedeliver = shouldRedeliver(exchange, data);
+            if (!shouldRedeliver) {
                 // we did not success with the redelivery so now we let the failure processor handle it
                 setFailureHandled(exchange);
                 // must decrement the redelivery counter as we didn't process the redelivery but is
@@ -155,11 +166,12 @@
 
                 // The line below shouldn't be needed, it is invoked by the AsyncCallback above
                 //restoreExceptionOnExchange(exchange, data.handledPredicate);
-                logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor, data, null);
+                logFailedDelivery(false, exchange, "Failed delivery for exchangeId: " + exchange.getExchangeId()
+                        + ". Handled by the failure processor: " + data.failureProcessor, data, null);
                 return sync;
             }
 
-            // should we redeliver
+            // if we are redelivering then sleep before trying again
             if (data.redeliveryCounter > 0) {
                 // okay we will give it another go so clear the exception so we can try again
                 if (exchange.getException() != null) {
@@ -201,53 +213,20 @@
 
     }
 
-    private void logFailedDelivery(String message, RedeliveryData data, Throwable e) {
-        LoggingLevel newLogLevel = null;
-        if (data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
-            newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
-        } else {
-            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
-        }
-        if (e != null) {
-            logger.log(message, e, newLogLevel);
-        } else {
-            logger.log(message, newLogLevel);
-        }
-    }
+    // Properties
+    // -------------------------------------------------------------------------
 
     public static boolean isFailureHandled(Exchange exchange) {
-        return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null 
+        return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
             || exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null;
     }
 
     public static void setFailureHandled(Exchange exchange) {
         exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
-        exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, exchange.getException());        
+        exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, exchange.getException());
         exchange.setException(null);
     }
 
-    protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) {
-        if (handledPredicate == null || !handledPredicate.matches(exchange)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("This exchange is not handled so its marked as failed: " + exchange);
-            }
-            // exception not handled, put exception back in the exchange
-            exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
-        } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("This exchange is handled so its marked as not failed: " + exchange);
-            }
-            exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE);
-        }
-    }
-
-    public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    // Properties
-    // -------------------------------------------------------------------------
-
     /**
      * Returns the output processor
      */
@@ -287,8 +266,42 @@
     }
 
     // Implementation methods
+
     // -------------------------------------------------------------------------
 
+    protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) {
+        if (handledPredicate == null || !handledPredicate.matches(exchange)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("This exchange is not handled so its marked as failed: " + exchange);
+            }
+            // exception not handled, put exception back in the exchange
+            exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("This exchange is handled so its marked as not failed: " + exchange);
+            }
+            exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE);
+        }
+    }
+
+    private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) {
+        LoggingLevel newLogLevel;
+        if (shouldRedeliver) {
+            newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
+        } else {
+            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+        }
+        if (e != null) {
+            logger.log(message, e, newLogLevel);
+        } else {
+            logger.log(message, newLogLevel);
+        }
+    }
+
+    private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
+        return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryUntilPredicate);
+    }
+
     /**
      * Increments the redelivery counter and adds the redelivered flag if the
      * message has been redelivered

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=721149&r1=721148&r2=721149&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Thu Nov 27 02:34:26 2008
@@ -16,9 +16,10 @@
  */
 package org.apache.camel.processor;
 
-import java.io.Serializable;
 import java.util.Random;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
 import org.apache.camel.model.LoggingLevel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -89,10 +90,21 @@
 
     /**
      * Returns true if the policy decides that the message exchange should be
-     * redelivered
-     */
-    public boolean shouldRedeliver(int redeliveryCounter) {
+     * redelivered.
+     *
+     * @param exchange  the current exchange
+     * @param redeliveryCounter  the current retry counter
+     * @param retryUntil  an optional predicate to determine if we should redeliver or not
+     * @return true to redeliver, false to stop
+     */
+    public boolean shouldRedeliver(Exchange exchange, int redeliveryCounter, Predicate retryUntil) {
+        // predicate is always used if provided
+        if (retryUntil != null) {
+            return retryUntil.matches(exchange);
+        }
+
         if (getMaximumRedeliveries() < 0) {
+            // retry forever if negative value
             return true;
         }
         // redeliver until we hit the max

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java (from r720962, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionHandleAndTransformTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionHandleAndTransformTest.java&r1=720962&r2=721149&rev=721149&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionHandleAndTransformTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java Thu Nov 27 02:34:26 2008
@@ -16,26 +16,41 @@
  */
 package org.apache.camel.processor.onexception;
 
+import org.apache.camel.Body;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeException;
+import org.apache.camel.Header;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.processor.DeadLetterChannel;
 
 /**
- * Unit test inspired by end user
+ * Unit test for the retry until predicate
  */
-public class OnExceptionHandleAndTransformTest extends ContextTestSupport {
+public class OnExceptionRetryUntilTest extends ContextTestSupport {
 
-    public void testOnExceptionTransformConstant() throws Exception {
+    private static int invoked;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myRetryHandler", new MyRetryBean());
+        return jndi;
+    }
+
+    public void testRetryUntil() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0));
 
                 // START SNIPPET: e1
-                // we catch MyFunctionalException and want to mark it as handled (= no failure returned to client)
-                // but we want to return a fixed text response, so we transform OUT body as Sorry.
+                // we want to use a predicate for retries so we can determine in our bean
+                // when retry should stop
                 onException(MyFunctionalException.class)
+                        .retryUntil(bean("myRetryHandler"))
                         .handled(true)
                         .transform().constant("Sorry");
                 // END SNIPPET: e1
@@ -50,59 +65,24 @@
 
         Object out = template.requestBody("direct:start", "Hello World");
         assertEquals("Sorry", out);
+        assertEquals(3, invoked);
     }
 
-    public void testOnExceptionTransformExceptionMessage() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0));
-
-                // START SNIPPET: e2
-                // we catch MyFunctionalException and want to mark it as handled (= no failure returned to client)
-                // but we want to return a fixed text response, so we transform OUT body and return the exception message
-                onException(MyFunctionalException.class)
-                        .handled(true)
-                        .transform(exceptionMessage());
-                // END SNIPPET: e2
-
-                from("direct:start").process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        throw new MyFunctionalException("Sorry you can not do this again to me");
-                    }
-                });
-            }
-        });
-
-        Object out = template.requestBody("direct:start", "Hello World");
-        assertEquals("Sorry you can not do this again to me", out);
-    }
-
-    public void testOnExceptionSimpleLangaugeExceptionMessage() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0));
-
-                // START SNIPPET: e3
-                // we catch MyFunctionalException and want to mark it as handled (= no failure returned to client)
-                // but we want to return a fixed text response, so we transform OUT body and return a nice message
-                // using the simple language where we want insert the exception message
-                onException(MyFunctionalException.class)
-                        .handled(true)
-                        .transform().simple("Error reported: ${exception.message} - can not process this message.");
-                // END SNIPPET: e3
-
-                from("direct:start").process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        throw new MyFunctionalException("Out of order");
-                    }
-                });
-            }
-        });
+    // START SNIPPET: e2
+    public class MyRetryBean {
 
-        Object out = template.requestBody("direct:start", "Hello World");
-        assertEquals("Error reported: Out of order - can not process this message.", out);
+        // using bean binding we can bind the information from the exchange to the types we have in our method signature
+        public boolean retryUntil(@Header(name = DeadLetterChannel.REDELIVERY_COUNTER) Integer counter, @Body String body, @ExchangeException Exception causedBy) {
+            // NOTE: counter is the redelivery attempt, will start from 1
+            invoked++;
+
+            assertEquals("Hello World", body);
+            assertTrue(causedBy instanceof MyFunctionalException);
+
+            // we can of course do what ever we want to determine the result but this is a unit test so we end after 3 attempts
+            return counter < 3;
+        }
     }
+    // END SNIPPET: e2
 
-}
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
------------------------------------------------------------------------------
    svn:mergeinfo =