You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/24 10:49:43 UTC

svn commit: r1052470 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/main/java/org/apache/camel/processor/l...

Author: davsclaus
Date: Fri Dec 24 09:49:43 2010
New Revision: 1052470

URL: http://svn.apache.org/viewvc?rev=1052470&view=rev
Log:
CAMEL-3460: Added RedeliveryMaxCounter header to message when Camel performs redelivery, so end user can know when its the last redelivery attempt.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Fri Dec 24 09:49:43 2010
@@ -143,12 +143,13 @@ public interface Exchange {
 
     String ON_COMPLETION = "CamelOnCompletion";
 
-    String REDELIVERED          = "CamelRedelivered";
-    String REDELIVERY_COUNTER   = "CamelRedeliveryCounter";
-    String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted";
-    String ROLLBACK_ONLY        = "CamelRollbackOnly";
-    String ROLLBACK_ONLY_LAST   = "CamelRollbackOnlyLast";
-    String ROUTE_STOP           = "CamelRouteStop";
+    String REDELIVERED             = "CamelRedelivered";
+    String REDELIVERY_COUNTER      = "CamelRedeliveryCounter";
+    String REDELIVERY_MAX_COUNTER  = "CamelRedeliveryMaxCounter";
+    String REDELIVERY_EXHAUSTED    = "CamelRedeliveryExhausted";
+    String ROLLBACK_ONLY           = "CamelRollbackOnly";
+    String ROLLBACK_ONLY_LAST      = "CamelRollbackOnlyLast";
+    String ROUTE_STOP              = "CamelRouteStop";
 
     String SOAP_ACTION        = "CamelSoapAction";
     String SKIP_GZIP_ENCODING = "CamelSkipGzipEncoding";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Fri Dec 24 09:49:43 2010
@@ -480,6 +480,7 @@ public abstract class RedeliveryErrorHan
         // its continued then remove traces of redelivery attempted and caught exception
         exchange.getIn().removeHeader(Exchange.REDELIVERED);
         exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
+        exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
         // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
 
         // create log message
@@ -534,7 +535,7 @@ public abstract class RedeliveryErrorHan
                 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
         logFailedDelivery(true, false, false, exchange, msg, data, e);
 
-        data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+        data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
     }
 
     /**
@@ -579,6 +580,7 @@ public abstract class RedeliveryErrorHan
             // its handled then remove traces of redelivery attempted
             exchange.getIn().removeHeader(Exchange.REDELIVERED);
             exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
+            exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
             handled = true;
         } else {
             // must decrement the redelivery counter as we didn't process the redelivery but is
@@ -806,7 +808,7 @@ public abstract class RedeliveryErrorHan
      * Increments the redelivery counter and adds the redelivered flag if the
      * message has been redelivered
      */
-    private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
+    private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
         Message in = exchange.getIn();
         Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
         int next = 1;
@@ -815,6 +817,10 @@ public abstract class RedeliveryErrorHan
         }
         in.setHeader(Exchange.REDELIVERY_COUNTER, next);
         in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+        // if maximum redeliveries is used, then provide that information as well
+        if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
+            in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
+        }
         return next;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Dec 24 09:49:43 2010
@@ -738,6 +738,9 @@ public class AggregateProcessor extends 
 
                             // set redelivery counter
                             exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+                            if (recoverable.getMaximumRedeliveries() > 0) {
+                                exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
+                            }
 
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Delivery attempt: " + data.redeliveryCounter + " to recover aggregated exchange with id: " + exchangeId + "");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Fri Dec 24 09:49:43 2010
@@ -209,6 +209,7 @@ public class FailOverLoadBalancer extend
         exchange.setProperty(Exchange.EXCEPTION_CAUGHT, null);
         exchange.getIn().removeHeader(Exchange.REDELIVERED);
         exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
+        exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
     }
 
     private boolean processExchange(Processor processor, Exchange exchange,

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java Fri Dec 24 09:49:43 2010
@@ -64,8 +64,8 @@ public class DeadLetterChannelOnRedelive
                 // MyRedeliveryProcessor before a redelivery is
                 // attempted. This allows us to alter the message before
                 errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(5)
-                        .onRedelivery(new MyRedeliverPrcessor())
-                        // setting delay to zero is just to make unit teting faster
+                        .onRedelivery(new MyRedeliverProcessor())
+                        // setting delay to zero is just to make unit testing faster
                         .redeliveryDelay(0L));
                 // END SNIPPET: e1
 
@@ -86,7 +86,7 @@ public class DeadLetterChannelOnRedelive
     // START SNIPPET: e2
     // This is our processor that is executed before every redelivery attempt
     // here we can do what we want in the java code, such as altering the message
-    public class MyRedeliverPrcessor implements Processor {
+    public class MyRedeliverProcessor implements Processor {
 
         public void process(Exchange exchange) throws Exception {
             // the message is being redelivered so we can alter it
@@ -94,9 +94,13 @@ public class DeadLetterChannelOnRedelive
             // we just append the redelivery counter to the body
             // you can of course do all kind of stuff instead
             String body = exchange.getIn().getBody(String.class);
-            int count = exchange.getIn().getHeader("CamelRedeliveryCounter", Integer.class);
+            int count = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
 
             exchange.getIn().setBody(body + count);
+
+            // the maximum redelivery was set to 5
+            int max = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
+            assertEquals(5, max);
         }
     }
     // END SNIPPET: e2

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Fri Dec 24 09:49:43 2010
@@ -39,6 +39,7 @@ public class DeadLetterChannelTest exten
         successEndpoint.expectedBodiesReceived(body);
         successEndpoint.message(0).header(Exchange.REDELIVERED).isEqualTo(true);
         successEndpoint.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(1);
+        successEndpoint.message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(2);
 
         deadEndpoint.expectedMessageCount(0);
 
@@ -54,6 +55,7 @@ public class DeadLetterChannelTest exten
         // no traces of redelivery as the dead letter channel will handle the exception when moving the DLQ
         deadEndpoint.message(0).header(Exchange.REDELIVERED).isNull();
         deadEndpoint.message(0).header(Exchange.REDELIVERY_COUNTER).isNull();
+        deadEndpoint.message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
         successEndpoint.expectedMessageCount(0);
 
         sendBody("direct:start", body);
@@ -77,6 +79,7 @@ public class DeadLetterChannelTest exten
         // no traces of redelivery as the dead letter channel will handle the exception when moving the DLQ
         deadEndpoint.message(0).header(Exchange.REDELIVERED).isNull();
         deadEndpoint.message(0).header(Exchange.REDELIVERY_COUNTER).isNull();
+        deadEndpoint.message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
         successEndpoint.expectedMessageCount(0);
 
         template.requestBody("direct:start", body);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java Fri Dec 24 09:49:43 2010
@@ -46,6 +46,7 @@ public class RedeliveryPolicyPerExceptio
         log.info("Found message with headers: " + in.getHeaders());
 
         assertMessageHeader(in, Exchange.REDELIVERY_COUNTER, 2);
+        assertMessageHeader(in, Exchange.REDELIVERY_MAX_COUNTER, 2);
         assertMessageHeader(in, Exchange.REDELIVERED, true);
     }
 
@@ -63,6 +64,7 @@ public class RedeliveryPolicyPerExceptio
         log.info("Found message with headers: " + in.getHeaders());
 
         assertMessageHeader(in, Exchange.REDELIVERY_COUNTER, 0);
+        assertMessageHeader(in, Exchange.REDELIVERY_MAX_COUNTER, null);
         assertMessageHeader(in, Exchange.REDELIVERED, false);
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java Fri Dec 24 09:49:43 2010
@@ -60,6 +60,7 @@ public class AsyncDeadLetterChannelTest 
         mock.expectedMessageCount(1);
         mock.message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
         mock.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+        mock.message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(2);
 
         try {
             template.requestBody("direct:in", "Hello World");

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java Fri Dec 24 09:49:43 2010
@@ -48,10 +48,23 @@ public class HawtDBAggregateRecoverDeadL
     public void testHawtDBAggregateRecoverDeadLetterChannel() throws Exception {
         // should fail all times
         getMockEndpoint("mock:result").expectedMessageCount(0);
+
         getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+        getMockEndpoint("mock:aggregated").message(0).header(Exchange.REDELIVERED).isNull();
+        getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(1);
+        getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+        getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+
         getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
         getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
         getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java?rev=1052470&r1=1052469&r2=1052470&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java Fri Dec 24 09:49:43 2010
@@ -54,6 +54,7 @@ public class HawtDBAggregateRecoverWithR
         getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
         // on the 2nd redelivery attempt we success
         getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);