You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/10 19:00:00 UTC

svn commit: r932763 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/processor/aggregate/ components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/

Author: davsclaus
Date: Sat Apr 10 16:59:59 2010
New Revision: 932763

URL: http://svn.apache.org/viewvc?rev=932763&view=rev
Log:
CAMEL-2568: RecoverableAggregationRepository now has max redeliveries and dead letter channel.

Added:
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java
      - copied, changed from r932757, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=932763&r1=932762&r2=932763&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sat Apr 10 16:59:59 2010
@@ -589,30 +589,42 @@ public class AggregateProcessor extends 
                         // and mark it as redelivered
                         exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
 
-                        // update current redelivery state
+                        // get the current redelivery data
                         RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
-                        if (data == null) {
-                            // create new data
-                            data = new RedeliveryData();
-                            redeliveryState.put(exchange.getExchangeId(), data);
-                        }
-                        data.redeliveryCounter++;
-
-                        // set redelivery counter
-                        exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
 
                         // if we are exhausted, then move to dead letter channel
-                        if (recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter > recoverable.getMaximumRedeliveries()) {
+                        if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
                             LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
                                     + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
+
+                            // send to DLC
                             try {
+                                // set redelivery counter
+                                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
                                 deadLetterProcessor.process(exchange);
-                                // confirm after it has been moved to dead letter channel
-                                recoverable.confirm(camelContext, exchangeId);
                             } catch (Exception e) {
-                                getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), e);
+                                exchange.setException(e);
+                            }
+
+                            // handle if failed
+                            if (exchange.getException() != null) {
+                                getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
+                            } else {
+                                // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
+                                recoverable.confirm(camelContext, exchangeId);
                             }
                         } else {
+                            // update current redelivery state
+                            if (data == null) {
+                                // create new data
+                                data = new RedeliveryData();
+                                redeliveryState.put(exchange.getExchangeId(), data);
+                            }
+                            data.redeliveryCounter++;
+
+                            // set redelivery counter
+                            exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Delivery attempt: " + data.redeliveryCounter + " to recover aggregated exchange with id: " + exchangeId + "");
                             }

Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java (from r932757, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java&r1=932757&r2=932763&rev=932763&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelFailedTest.java Sat Apr 10 16:59:59 2010
@@ -24,7 +24,7 @@ import org.apache.camel.processor.aggreg
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
-public class HawtDBAggregateRecoverDeadLetterChannelTest extends CamelTestSupport {
+public class HawtDBAggregateRecoverDeadLetterChannelFailedTest extends CamelTestSupport {
 
     private HawtDBAggregationRepository<String> repo;
 
@@ -37,7 +37,7 @@ public class HawtDBAggregateRecoverDeadL
         // exhaust after at most 3 attempts
         repo.setMaximumRedeliveries(3);
         // and move to this dead letter channel
-        repo.setDeadLetterUri("mock:dead");
+        repo.setDeadLetterUri("direct:dead");
         // check faster
         repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
 
@@ -45,13 +45,19 @@ public class HawtDBAggregateRecoverDeadL
     }
 
     @Test
-    public void testHawtDBAggregateRecoverDeadLetterChannel() throws Exception {
+    public void testHawtDBAggregateRecoverDeadLetterChannelFailed() throws Exception {
         // should fail all times
         getMockEndpoint("mock:result").expectedMessageCount(0);
         getMockEndpoint("mock:aggregated").expectedMessageCount(4);
-        getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
+        // it should keep sending to DLC if it failed, so test for min 3 messages
+        getMockEndpoint("mock:dead").expectedMinimumMessageCount(3);
+        // all the details should be the same about redelivered and redelivered 3 times
         getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
-        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(4);
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:dead").message(2).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:dead").message(2).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
@@ -75,6 +81,10 @@ public class HawtDBAggregateRecoverDeadL
                         .throwException(new IllegalArgumentException("Damn"))
                         .to("mock:result")
                     .end();
+
+                from("direct:dead")
+                    .to("mock:dead")
+                    .throwException(new IllegalArgumentException("We are dead"));
             }
         };
     }

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java?rev=932763&r1=932762&r2=932763&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverDeadLetterChannelTest.java Sat Apr 10 16:59:59 2010
@@ -51,7 +51,7 @@ public class HawtDBAggregateRecoverDeadL
         getMockEndpoint("mock:aggregated").expectedMessageCount(4);
         getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
         getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
-        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(4);
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);