You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by jo...@apache.org on 2011/10/19 02:20:09 UTC

svn commit: r1185928 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/Exchange.java main/java/org/apache/camel/processor/RedeliveryErrorHandler.java test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java

Author: joed
Date: Wed Oct 19 00:20:09 2011
New Revision: 1185928

URL: http://svn.apache.org/viewvc?rev=1185928&view=rev
Log:
Dynamic delays via header on a redelivery.
Full test-suite done in core.

https://issues.apache.org/jira/browse/CAMEL-4558

Thanks Rich!

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1185928&r1=1185927&r2=1185928&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Oct 19 00:20:09 2011
@@ -16,12 +16,12 @@
  */
 package org.apache.camel;
 
-import java.util.List;
-import java.util.Map;
-
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * An Exchange is the message container holding the information during the entire routing of
  * a {@link  Message} received by a {@link Consumer}. 
@@ -159,6 +159,7 @@ public interface Exchange {
     String REDELIVERY_COUNTER      = "CamelRedeliveryCounter";
     String REDELIVERY_MAX_COUNTER  = "CamelRedeliveryMaxCounter";
     String REDELIVERY_EXHAUSTED    = "CamelRedeliveryExhausted";
+    String REDELIVERY_DELAY        = "CamelRedeliveryDelay";
     String ROLLBACK_ONLY           = "CamelRollbackOnly";
     String ROLLBACK_ONLY_LAST      = "CamelRollbackOnlyLast";
     String ROUTE_STOP              = "CamelRouteStop";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1185928&r1=1185927&r2=1185928&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Wed Oct 19 00:20:09 2011
@@ -16,32 +16,18 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.LoggingLevel;
-import org.apache.camel.Message;
-import org.apache.camel.Predicate;
-import org.apache.camel.Processor;
+import org.apache.camel.*;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.SubUnitOfWorkCallback;
 import org.apache.camel.spi.ThreadPoolProfile;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.*;
 import org.apache.camel.util.CamelLogger;
-import org.apache.camel.util.EventHelper;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.MessageHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Base redeliverable error handler that also supports a final dead letter queue in case
@@ -268,7 +254,7 @@ public abstract class RedeliveryErrorHan
 
             if (data.redeliveryCounter > 0) {
                 // calculate delay
-                data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
+                data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
 
                 if (data.redeliveryDelay > 0) {
                     // okay there is a delay so create a scheduled task to have it executed in the future
@@ -358,6 +344,32 @@ public abstract class RedeliveryErrorHan
     }
 
     /**
+     * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
+     * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
+     *
+     * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay}
+     * and {@link RedeliveryData#redeliveryCounter} are copied in.</p>
+     *
+     * @param exchange The current exchange in question.
+     * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
+     * @param redeliveryDelay The default redelivery delay from RedeliveryData
+     * @param redeliveryCounter The redeliveryCounter
+     * @return The time to wait before the next redelivery.
+     */
+    protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter){
+        Message message = exchange.getIn();
+        Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class);
+        if (delay == null) {
+            delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
+        }else{
+            if (log.isDebugEnabled()) {
+                log.debug("Redelivery delay is {} from Message.getHeader(Exchange.REDELIVERY_DELAY)", new Object[]{delay});
+            }
+        }
+        return delay;
+    }
+
+    /**
      * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
      * <p/>
      * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java?rev=1185928&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java Wed Oct 19 00:20:09 2011
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+public class RedeliveryWithExceptionAndFaultDelayInHeader extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testOk() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        String out = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testTransientAndPersistentError() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        String out = template.requestBody("direct:start", "Boom", String.class);
+        assertEquals("Persistent error", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testTransientAndPersistentErrorWithExchange() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        Exchange out = template.request("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Boom");
+            }
+        });
+        assertTrue("Should be failed", out.isFailed());
+        assertNull("No exception", out.getException());
+        assertTrue(out.getOut() != null && out.getOut().isFault());
+        assertEquals("Persistent error", out.getOut().getBody());
+
+        assertMockEndpointsSatisfied();
+
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(defaultErrorHandler().maximumRedeliveries(5));
+
+                from("direct:start")
+
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                exchange.getIn().setHeader(Exchange.REDELIVERY_DELAY, 100);
+                                counter++;
+                                if (counter < 3) {
+                                    throw new IllegalArgumentException("Try again");
+                                }
+
+                                if (exchange.getIn().getBody().equals("Boom")) {
+                                    exchange.getOut().setFault(true);
+                                    exchange.getOut().setBody("Persistent error");
+                                } else {
+                                    exchange.getOut().setBody("Bye World");
+                                }
+                            }
+                        }).to("mock:result");
+            }
+        };
+    }
+}