You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/21 13:18:09 UTC

svn commit: r746489 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Sat Feb 21 12:18:08 2009
New Revision: 746489

URL: http://svn.apache.org/viewvc?rev=746489&view=rev
Log:
CAMEL-1376, CAMEL-935: Introduced delayPattern and fixed a bug with onRedelivery in DeadLetterChannel.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryDelayPatternTest.java
      - copied, changed from r746448, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyDelayPatternTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=746489&r1=746488&r2=746489&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Sat Feb 21 12:18:08 2009
@@ -89,6 +89,11 @@
         return this;
     }
 
+    public DeadLetterChannelBuilder delayPattern(String delayPattern) {
+        getRedeliveryPolicy().delayPattern(delayPattern);
+        return this;
+    }
+
     public DeadLetterChannelBuilder maximumRedeliveries(int maximumRedeliveries) {
         getRedeliveryPolicy().maximumRedeliveries(maximumRedeliveries);
         return this;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=746489&r1=746488&r2=746489&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Sat Feb 21 12:18:08 2009
@@ -180,7 +180,7 @@
                 MessageHelper.resetStreamCache(exchange.getIn());
 
                 // wait until we should redeliver
-                data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+                data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter);
 
                 // letting onRedeliver be executed
                 deliverToRedeliveryProcessor(exchange, callback, data);
@@ -257,7 +257,7 @@
                 exchange.setException(null);
             }
             // wait until we should redeliver
-            data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
+            data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
             timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
 
             // letting onRedeliver be executed
@@ -293,23 +293,24 @@
      * Gives an optional configure redelivery processor a chance to process before the Exchange
      * will be redelivered. This can be used to alter the Exchange.
      */
-    private boolean deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback,
+    private void deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback,
                                             final RedeliveryData data) {
         if (redeliveryProcessor == null) {
-            return true;
+            return;
         }
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is processing Exchange before its redelivered");
+            LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is processing Exchange: " + exchange + " before its redelivered");
         }
+
         AsyncProcessor afp = AsyncProcessorTypeConverter.convert(redeliveryProcessor);
-        boolean sync = afp.process(exchange, new AsyncCallback() {
+        afp.process(exchange, new AsyncCallback() {
             public void done(boolean sync) {
-                callback.done(data.sync);
+                LOG.trace("Redelivery processor done");
+                // do NOT call done on callback as this is the redelivery processor that
+                // is done. we should not mark the entire exchange as done.
             }
         });
-
-        return sync;
     }
     
     private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=746489&r1=746488&r2=746489&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Sat Feb 21 12:18:08 2009
@@ -20,6 +20,7 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.model.LoggingLevel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,23 @@
  * <p/>
  * Setting the maximumRedeliveries to a negative value such as -1 will then always redeliver (unlimited).
  * Setting the maximumRedeliveries to 0 will disable redelivery.
+ * <p/>
+ * This policy can be configured either by one of the following two settings:
+ * <ul>
+ *   <li>using convnetional options, using all the options defined above</li>
+ *   <li>using delay pattern to declare intervals for delays</li>
+ * </ul>
+ * <p/>
+ * <b>Note:</b> If using delay patterns then the following options is not used (delay, backOffMultiplier, useExponentialBackOff, useCollisionAvoidance)
+ * <p/>
+ * <b>Using delay pattern</b>:
+ * <br/>The delay pattern syntax is: <tt>limit:delay;limit 2:delay 2;limit 3:delay 3;...;limit N:delay N</tt>.
+ * <p/>
+ * How it works is best illustrate with an example with this pattern: <tt>delayPattern=5:1000;10:5000:20:20000</tt>
+ * <br/>The delays will be for attempt in range 0..4 = 0 millis, 5..9 = 1000 millis, 10..19 = 5000 millis, >= 20 = 20000 millis.
+ * <p/>
+ * If you want to set a starting delay, then use 0 as the first limit, eg: <tt>0:1000;5:5000</tt> will use 1 sec delay
+ * until attempt number 5 where it will use 5 seconds going forward.
  *
  * @version $Revision$
  */
@@ -63,6 +81,7 @@
     protected boolean useCollisionAvoidance;
     protected LoggingLevel retriesExhaustedLogLevel = LoggingLevel.ERROR;
     protected LoggingLevel retryAttemptedLogLevel = LoggingLevel.ERROR;
+    protected String delayPattern;
 
     public RedeliveryPolicy() {
     }
@@ -77,7 +96,8 @@
             + ", useExponentialBackOff="  + useExponentialBackOff
             + ", backOffMultiplier=" + backOffMultiplier
             + ", useCollisionAvoidance=" + useCollisionAvoidance
-            + ", collisionAvoidanceFactor=" + collisionAvoidanceFactor + "]";
+            + ", collisionAvoidanceFactor=" + collisionAvoidanceFactor
+            + ", delayPattern=" + delayPattern + "]";
     }
 
     public RedeliveryPolicy copy() {
@@ -114,9 +134,13 @@
 
     /**
      * Calculates the new redelivery delay based on the last one then sleeps for the necessary amount of time
+     *
+     * @param redeliveryDelay  previous redelivery delay
+     * @param redeliveryCounter  number of previous redelivery attempts
+     * @return the calculate delay
      */
-    public long sleep(long redeliveryDelay) {
-        redeliveryDelay = getRedeliveryDelay(redeliveryDelay);
+    public long sleep(long redeliveryDelay, int redeliveryCounter) {
+        redeliveryDelay = calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
 
         if (redeliveryDelay > 0) {
             if (LOG.isDebugEnabled()) {
@@ -125,18 +149,22 @@
             try {
                 Thread.sleep(redeliveryDelay);
             } catch (InterruptedException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Thread interrupted: " + e, e);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Thread interrupted: " + e, e);
                 }
             }
         }
         return redeliveryDelay;
     }
 
+    protected long calculateRedeliveryDelay(long previousDelay, int redeliveryCounter) {
+        if (ObjectHelper.isNotEmpty(delayPattern)) {
+            // calculate delay using the pattern
+             return calculateRedeliverDelayUsingPattern(delayPattern, redeliveryCounter);
+        }
 
-    public long getRedeliveryDelay(long previousDelay) {
+        // calculate the delay using the conventional parameters
         long redeliveryDelay;
-
         if (previousDelay == 0) {
             redeliveryDelay = delay;
         } else if (useExponentialBackOff && backOffMultiplier > 1) {
@@ -164,6 +192,28 @@
         return redeliveryDelay;
     }
 
+    /**
+     * Calculates the delay using the delay pattern
+     */
+    protected static long calculateRedeliverDelayUsingPattern(String delayPattern, int redeliveryCounter) {
+        String[] groups = delayPattern.split(";");
+        // find the group where ther redelivery counter matches
+        long answer = 0;
+        for (String group : groups) {
+            long delay = Long.valueOf(ObjectHelper.after(group, ":"));
+            int count = Integer.valueOf(ObjectHelper.before(group, ":"));
+            
+
+            if (count > redeliveryCounter) {
+                break;
+            } else {
+                answer = delay;
+            }
+        }
+
+        return answer;
+    }
+
 
     // Builder methods
     // -------------------------------------------------------------------------
@@ -238,6 +288,14 @@
         return this;
     }    
     
+    /**
+     * Sets the delay pattern with delay intervals.
+     */
+    public RedeliveryPolicy delayPattern(String delayPattern) {
+        setDelayPattern(delayPattern);
+        return this;
+    }
+
     // Properties
     // -------------------------------------------------------------------------
     public double getBackOffMultiplier() {
@@ -352,4 +410,15 @@
     public LoggingLevel getRetryAttemptedLogLevel() {
         return retryAttemptedLogLevel;
     }
+
+    public String getDelayPattern() {
+        return delayPattern;
+    }
+
+    /**
+     * Sets an optional delay pattern to use insted of fixed delay.
+     */
+    public void setDelayPattern(String delayPattern) {
+        this.delayPattern = delayPattern;
+    }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?rev=746489&r1=746488&r2=746489&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java Sat Feb 21 12:18:08 2009
@@ -38,7 +38,7 @@
         assertMockEndpointsSatisfied();
     }
 
-    public void testOnExceptionAlterMessageWithHeadersBeforeRedelivery() throws Exception {
+    public void xxxtestOnExceptionAlterMessageWithHeadersBeforeRedelivery() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World123");
         mock.expectedHeaderReceived("foo", "123");

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryDelayPatternTest.java (from r746448, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryDelayPatternTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryDelayPatternTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java&r1=746448&r2=746489&rev=746489&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryDelayPatternTest.java Sat Feb 21 12:18:08 2009
@@ -19,109 +19,56 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import static org.apache.camel.builder.ProcessorBuilder.throwException;
 
 /**
- * Unit test to verify that redelivery counters is working as expected.
+ * Unit test to verify delay pattern
  */
-public class DeadLetterChannelRedeliveryTest extends ContextTestSupport {
+public class DeadLetterChannelRedeliveryDelayPatternTest extends ContextTestSupport {
 
     private static int counter;
 
-    public void testRedeliveryTest() throws Exception {
+    public void testDelayPatternTest() throws Exception {
         counter = 0;
 
         // We expect the exchange here after 1 delivery and 2 re-deliveries
         MockEndpoint mock = getMockEndpoint("mock:error");
         mock.expectedMessageCount(1);
         mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE);
-        mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(2);
+        mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(3);
 
+        long start = System.currentTimeMillis();
         try {
             template.sendBody("direct:start", "Hello World");
-        } catch (RuntimeCamelException e) {
+            fail("Should have thrown exception");
+        } catch (Exception e) {
             // expected
+            assertEquals("Forced exception by unit test", e.getCause().getMessage());
         }
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should be slower", delta > 1000);
 
         assertMockEndpointsSatisfied();
 
-        assertEquals(3, counter); // One call + 2 re-deliveries
-    }
-
-    public void testNoRedeliveriesTest() throws Exception {
-        counter = 0;
-
-        // We expect the exchange here after 1 delivery
-        MockEndpoint mock = getMockEndpoint("mock:no");
-        mock.expectedMessageCount(1);
-        mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.FALSE);
-        mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(0);
-
-        try {
-            template.sendBody("direct:no", "Hello World");
-        } catch (RuntimeCamelException e) {
-            // expected
-        }
-
-        assertMockEndpointsSatisfied();
-
-        assertEquals(1, counter); // One call
-    }
-
-    public void testOneRedeliveryTest() throws Exception {
-        counter = 0;
-
-        // We expect the exchange here after 1 delivery and 1 re delivery
-        MockEndpoint mock = getMockEndpoint("mock:one");
-        mock.expectedMessageCount(1);
-        mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE);
-        mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(1);
-
-        try {
-            template.sendBody("direct:one", "Hello World");
-        } catch (RuntimeCamelException e) {
-            // expected
-        }
-
-        assertMockEndpointsSatisfied();
-
-        assertEquals(2, counter); // One call + 1 re-delivery
+        assertEquals(3, counter);
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("direct:start")
-                    .errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2))
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            counter++;
-                            throw new Exception("Forced exception by unit test");
-                        }
-                    });
-
-                from("direct:no")
-                    .errorHandler(deadLetterChannel("mock:no").maximumRedeliveries(0))
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            counter++;
-                            throw new Exception("Forced exception by unit test");
-                        }
-                    });
-
-                from("direct:one")
-                    .errorHandler(deadLetterChannel("mock:one").maximumRedeliveries(1))
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            counter++;
-                            throw new Exception("Forced exception by unit test");
-                        }
-                    });
+                errorHandler(deadLetterChannel("mock:error").delayPattern("0:250;2:500").maximumRedeliveries(3)
+                        .onRedelivery(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        counter++;
+                    }
+                }));
+
+                from("direct:start").process(throwException(new Exception("Forced exception by unit test")));
             }
         };
     }
 
-}
+}
\ No newline at end of file

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyDelayPatternTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyDelayPatternTest.java?rev=746489&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyDelayPatternTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyDelayPatternTest.java Sat Feb 21 12:18:08 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import junit.framework.TestCase;
+
+/**
+ *  @version $Revision$
+ */
+public class RedeliveryPolicyDelayPatternTest extends TestCase {
+
+    private RedeliveryPolicy policy = new RedeliveryPolicy();
+
+    public void testDelayPattern() throws Exception {
+        policy.setDelayPattern("3:1000;5:3000;10:5000;20:10000");
+
+        assertEquals(0, policy.calculateRedeliveryDelay(0, 0));
+        assertEquals(0, policy.calculateRedeliveryDelay(0, 1));
+        assertEquals(0, policy.calculateRedeliveryDelay(0, 2));
+        assertEquals(1000, policy.calculateRedeliveryDelay(0, 3));
+        assertEquals(1000, policy.calculateRedeliveryDelay(0, 4));
+        assertEquals(3000, policy.calculateRedeliveryDelay(0, 5));
+        assertEquals(3000, policy.calculateRedeliveryDelay(0, 5));
+        assertEquals(3000, policy.calculateRedeliveryDelay(0, 6));
+        assertEquals(3000, policy.calculateRedeliveryDelay(0, 7));
+        assertEquals(3000, policy.calculateRedeliveryDelay(0, 8));
+        assertEquals(3000, policy.calculateRedeliveryDelay(0, 9));
+        assertEquals(5000, policy.calculateRedeliveryDelay(0, 10));
+        assertEquals(5000, policy.calculateRedeliveryDelay(0, 11));
+        assertEquals(5000, policy.calculateRedeliveryDelay(0, 15));
+        assertEquals(5000, policy.calculateRedeliveryDelay(0, 19));
+        assertEquals(10000, policy.calculateRedeliveryDelay(0, 20));
+        assertEquals(10000, policy.calculateRedeliveryDelay(0, 21));
+        assertEquals(10000, policy.calculateRedeliveryDelay(0, 25));
+        assertEquals(10000, policy.calculateRedeliveryDelay(0, 50));
+        assertEquals(10000, policy.calculateRedeliveryDelay(0, 100));
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyDelayPatternTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyDelayPatternTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date