You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/15 08:18:47 UTC

svn commit: r784665 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-spring/src/main/java/org/apache/camel/spring/spi/

Author: davsclaus
Date: Mon Jun 15 06:18:47 2009
New Revision: 784665

URL: http://svn.apache.org/viewvc?rev=784665&view=rev
Log:
CAMEL-1706: DefaultErrorHandler is now as powerful as DLC.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Mon Jun 15 06:18:47 2009
@@ -18,6 +18,7 @@
 
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
 
 /**
@@ -50,6 +51,15 @@
         setExceptionPolicy(exceptionPolicyStrategy);
     }
 
+    public boolean supportDeadLetterQueue() {
+        return true;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        // just to let the stacktrace reveal that this is a dead letter channel
+        super.process(exchange);
+    }
+
     @Override
     public String toString() {
         return "DeadLetterChannel[" + output + ", " + (deadLetterUri != null ? deadLetterUri : deadLetter) + "]";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java Mon Jun 15 06:18:47 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
@@ -43,6 +44,15 @@
         setExceptionPolicy(exceptionPolicyStrategy);
     }
 
+    public boolean supportDeadLetterQueue() {
+        return false;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        // just to let the stacktrace reveal that this is a dead letter channel
+        super.process(exchange);
+    }
+
     @Override
     public String toString() {
         return "DefaultErrorHandler[" + output + "]";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Jun 15 06:18:47 2009
@@ -42,12 +42,6 @@
     // TODO: support onException being able to use other onException to route they exceptions
     // (hard one to get working, has not been supported before)
 
-    // TODO: the while loop method should be refactored a bit so its more higher level so the
-    // code is easier to read and understand
-
-    // TODO: add support for onRedeliver(SocketException.class) to allow running som custom
-    // route when this given exception is being redelivered (create ticket and add in 2.1)
-
     protected final Processor deadLetter;
     protected final String deadLetterUri;
     protected final Processor output;
@@ -64,7 +58,8 @@
 
         // default behavior which can be overloaded on a per exception basis
         RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
-        Processor failureProcessor = deadLetter;
+        Processor deadLetterProcessor = deadLetter;
+        Processor failureProcessor;
         Processor onRedeliveryProcessor = redeliveryProcessor;
         Predicate handledPredicate = handledPolicy;
         boolean useOriginalInBody = useOriginalBodyPolicy;
@@ -87,6 +82,11 @@
         return false;
     }
 
+    /**
+     * Whether this error handler supports dead letter queue or not
+     */
+    public abstract boolean supportDeadLetterQueue();
+
     public void process(Exchange exchange) throws Exception {
         processErrorHandler(exchange, new RedeliveryData());
     }
@@ -117,7 +117,7 @@
                 return;
             }
 
-            // did previous processing caused an exception?
+            // did previous processing cause an exception?
             if (exchange.getException() != null) {
                 handleException(exchange, data);
             }
@@ -125,17 +125,19 @@
             // compute if we should redeliver or not
             boolean shouldRedeliver = shouldRedeliver(exchange, data);
             if (!shouldRedeliver) {
-                // TODO: divde into onException and deadLetterQueue
-                // no then move it to the dead letter queue
-                deliverToFailureProcessor(exchange, data);
-                // prepare the exchange for failure
+                // no we should not redeliver to the same output so either try an onException (if any given)
+                // or the dead letter queue
+                Processor target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
+                // deliver to the failure processor (either an on exception or dead letter queue
+                deliverToFailureProcessor(target, exchange, data);
+                // prepare the exchange for failure before returning
                 prepareExchangeAfterFailure(exchange, data);
-                // we could not process the exchange succesfully so break
+                // and then return
                 return;
             }
 
             // if we are redelivering then sleep before trying again
-            if (data.redeliveryCounter > 0) {
+            if (shouldRedeliver && data.redeliveryCounter > 0) {
                 prepareExchangeForRedelivery(exchange);
 
                 // wait until we should redeliver
@@ -151,15 +153,15 @@
                 deliverToRedeliveryProcessor(exchange, data);
             }
 
-            // process the exchange
+            // process the exchange (also redelivery)
             try {
                 output.process(exchange);
             } catch (Exception e) {
                 exchange.setException(e);
             }
 
-            // only process if the exchange hasn't failed
-            // and it has not been handled by the error processor
+            // only done if the exchange hasn't failed
+            // and it has not been handled by the failure processor
             boolean done = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange);
             if (done) {
                 return;
@@ -263,7 +265,8 @@
     /**
      * All redelivery attempts failed so move the exchange to the dead letter queue
      */
-    protected void deliverToFailureProcessor(final Exchange exchange, final RedeliveryData data) {
+    protected void deliverToFailureProcessor(final Processor processor, final Exchange exchange,
+                                             final RedeliveryData data) {
         // we did not success with the redelivery so now we let the failure processor handle it
         ExchangeHelper.setFailureHandled(exchange);
         // must decrement the redelivery counter as we didn't process the redelivery but is
@@ -272,11 +275,11 @@
         // reset cached streams so they can be read again
         MessageHelper.resetStreamCache(exchange.getIn());
 
-        if (data.failureProcessor != null) {
+        if (processor != null) {
             // prepare original IN body if it should be moved instead of current body
             if (data.useOriginalInBody) {
                 if (log.isTraceEnabled()) {
-                    log.trace("Using the original IN body in the DedLetterQueue instead of the current IN body");
+                    log.trace("Using the original IN body instead of the current IN body");
                 }
 
                 Object original = exchange.getUnitOfWork().getOriginalInBody();
@@ -284,17 +287,17 @@
             }
 
             if (log.isTraceEnabled()) {
-                log.trace("Failure processor " + data.failureProcessor + " is processing Exchange: " + exchange);
+                log.trace("Failure processor " + processor + " is processing Exchange: " + exchange);
             }
             try {
-                data.failureProcessor.process(exchange);
+                processor.process(exchange);
             } catch (Exception e) {
                 exchange.setException(e);
             }
             log.trace("Failure processor done");
 
             String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
-                    + ". Processed by failure processor: " + data.failureProcessor;
+                    + ". Processed by failure processor: " + processor;
             logFailedDelivery(false, exchange, msg, data, null);
         }
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java Mon Jun 15 06:18:47 2009
@@ -95,7 +95,7 @@
         return new RouteBuilder() {
             public void configure() throws Exception {
                 from("direct:start")
-                    .errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2))
+                    .errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliverDelay(0))
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             counter++;
@@ -113,7 +113,7 @@
                     });
 
                 from("direct:one")
-                    .errorHandler(deadLetterChannel("mock:one").maximumRedeliveries(1))
+                    .errorHandler(deadLetterChannel("mock:one").maximumRedeliveries(1).redeliverDelay(0))
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             counter++;

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java Mon Jun 15 06:18:47 2009
@@ -41,6 +41,8 @@
  */
 public class TransactionErrorHandler extends ErrorHandlerSupport {
 
+    // TODO: extend RedeliverErrorHandler
+
     private static final transient Log LOG = LogFactory.getLog(TransactionErrorHandler.class);
     private final TransactionTemplate transactionTemplate;
     private Processor output;