You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/12/03 05:51:06 UTC

svn commit: r722772 - in /activemq/camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java

Author: ningjiang
Date: Tue Dec  2 20:51:05 2008
New Revision: 722772

URL: http://svn.apache.org/viewvc?rev=722772&view=rev
Log:
Merged revisions 722726 via svnmerge from 
https://svn.apache.org/repos/asf/activemq/camel/trunk

........
  r722726 | ningjiang | 2008-12-03 10:54:15 +0800 (Wed, 03 Dec 2008) | 1 line
  
  CAMEL-1129 Enhance ErrorHandler RedeliveryPolicy with a Timer manager to avoid locking current thread while sleeping
........

Modified:
    activemq/camel/branches/camel-1.x/   (props changed)
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec  2 20:51:05 2008
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=722772&r1=722771&r2=722772&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Dec  2 20:51:05 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
@@ -50,6 +52,7 @@
     private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
     private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
     
+    private static Timer timer = new Timer();
     private Processor output;
     private Processor deadLetter;
     private AsyncProcessor outputAsync;
@@ -66,6 +69,40 @@
         RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
         Processor failureProcessor = deadLetter;
     }
+    
+    private class RedeliverTimerTask extends TimerTask {
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final RedeliveryData data;
+        
+        public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.data = data;
+        }
+
+        @Override
+        public void run() {
+            //only handle the real AsyncProcess the exchange 
+            outputAsync.process(exchange, new AsyncCallback() {
+                public void done(boolean sync) {
+                    // Only handle the async case...
+                    if (sync) {
+                        return;
+                    }
+                    data.sync = false;
+                    // only process if the exchange hasn't failed
+                    // and it has not been handled by the error processor
+                    if (exchange.getException() != null && !isFailureHandled(exchange)) {
+                        // if we are redelivering then sleep before trying again
+                        asyncProcess(exchange, callback, data);
+                    } else {
+                        callback.done(sync);
+                    }
+                }
+            });                
+        } 
+    }
 
     public DeadLetterChannel(Processor output, Processor deadLetter) {
         this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(),
@@ -118,45 +155,12 @@
 
             // did previous processing caused an exception?
             if (exchange.getException() != null) {
-                Throwable e = exchange.getException();
-                // set the original caused exception
-                exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e);
-
-                // find the error handler to use (if any)
-                ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
-                if (exceptionPolicy != null) {
-                    data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
-                    data.handledPredicate = exceptionPolicy.getHandledPolicy();
-                    Processor processor = exceptionPolicy.getErrorHandler();
-                    if (processor != null) {
-                        data.failureProcessor = processor;
-                    }                    
-                }
-                
-                logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
-                data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+                handleException(exchange, data);
             }
 
             // should we redeliver or not?
             if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
-                // we did not success with the redelivery so now we let the failure processor handle it
-                setFailureHandled(exchange);
-                // must decrement the redelivery counter as we didn't process the redelivery but is
-                // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
-                decrementRedeliveryCounter(exchange);
-
-                AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
-                boolean sync = afp.process(exchange, new AsyncCallback() {
-                    public void done(boolean sync) {
-                        restoreExceptionOnExchange(exchange, data.handledPredicate);
-                        callback.done(data.sync);
-                    }
-                });
-
-                // The line below shouldn't be needed, it is invoked by the AsyncCallback above
-                //restoreExceptionOnExchange(exchange, data.handledPredicate);
-                logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor, data, null);
-                return sync;
+                return deliverToFaultProcessor(exchange, callback, data);
             }
 
             // should we redeliver
@@ -180,8 +184,9 @@
                     data.sync = false;
                     // only process if the exchange hasn't failed
                     // and it has not been handled by the error processor
-                    if (exchange.getException() != null && !isFailureHandled(exchange)) {
-                        process(exchange, callback, data);
+                    if (exchange.getException() != null && !isFailureHandled(exchange)) {                        
+                        //TODO Call the Timer for the asyncProcessor
+                        asyncProcess(exchange, callback, data);
                     } else {
                         callback.done(sync);
                     }
@@ -215,6 +220,91 @@
         }
     }
 
+    protected void asyncProcess(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+        // set the timer here
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(data.sync);
+            return;
+        }
+
+        // if the exchange is transacted then let the underlying system handle the redelivery etc.
+        // this DeadLetterChannel is only for non transacted exchanges
+        if (exchange.isTransacted() && exchange.getException() != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange);
+            }
+            return;
+        }
+        
+        // did previous processing caused an exception?
+        if (exchange.getException() != null) {
+            handleException(exchange, data);
+        }        
+        
+        if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+            deliverToFaultProcessor(exchange, callback, data);
+            return;
+        }
+        
+        // process the next try
+        // if we are redelivering then sleep before trying again
+        if (data.redeliveryCounter > 0) {
+            // okay we will give it another go so clear the exception so we can try again
+            if (exchange.getException() != null) {
+                exchange.setException(null);
+            }
+            // wait until we should redeliver
+            data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
+            timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
+        }        
+    }
+    
+    private void handleException(Exchange exchange, RedeliveryData data) {
+        Throwable e = exchange.getException();
+        // set the original caused exception
+        exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e);
+
+        // find the error handler to use (if any)
+        ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
+        if (exceptionPolicy != null) {
+            data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
+            data.handledPredicate = exceptionPolicy.getHandledPolicy();
+            Processor processor = exceptionPolicy.getErrorHandler();
+            if (processor != null) {
+                data.failureProcessor = processor;
+            }                    
+        }
+        
+        logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
+        data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+        
+    }
+    
+    private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,
+                                            final RedeliveryData data) {
+        // we did not success with the redelivery so now we let the failure processor handle it
+        setFailureHandled(exchange);
+        // must decrement the redelivery counter as we didn't process the redelivery but is
+        // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
+        decrementRedeliveryCounter(exchange);
+
+        AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
+        boolean sync = afp.process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                restoreExceptionOnExchange(exchange, data.handledPredicate);
+                callback.done(data.sync);
+            }
+        });
+
+        // The line below shouldn't be needed, it is invoked by the AsyncCallback above
+        //restoreExceptionOnExchange(exchange, data.handledPredicate);
+        logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor, data, null);
+        return sync;
+    }
+
     public static boolean isFailureHandled(Exchange exchange) {
         return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null 
             || exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null;
@@ -331,5 +421,6 @@
     @Override
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(deadLetter, output);
-    }
+    }    
+    
 }

Modified: activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=722772&r1=722771&r2=722772&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Tue Dec  2 20:51:05 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -76,7 +78,7 @@
     }
 
     protected RouteBuilder createRouteBuilder() {
-        final Processor processor = new Processor() {
+        final Processor processor = new AsyncProcessor() {
             public void process(Exchange exchange) {
                 Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
                                                              Integer.class);
@@ -86,13 +88,32 @@
                                                + " being less than: " + failUntilAttempt);
                 }
             }
+
+            public boolean process(Exchange exchange, AsyncCallback callback) {                
+                Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
+                                                             Integer.class);
+                int attempt = (counter == null) ? 1 : counter + 1;
+                if (attempt > 1) {
+                    assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(), "Timer-0");
+                }
+                
+                if (attempt < failUntilAttempt) {
+                    // we can't throw the exception here , or the callback will not be invoked.
+                    exchange.setException(new RuntimeException("Failed to process due to attempt: " + attempt
+                                               + " being less than: " + failUntilAttempt));
+                }
+                callback.done(false);
+                return false;
+            }
         };
 
         return new RouteBuilder() {
             public void configure() {
                 from("direct:start").errorHandler(
                     deadLetterChannel("mock:failed").maximumRedeliveries(2)
-                        .initialRedeliveryDelay(1)
+
+                        .initialRedeliveryDelay(1000)
+
                         .loggingLevel(LoggingLevel.DEBUG)
 
                 ).process(processor).to("mock:success");