You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/12/03 05:51:06 UTC
svn commit: r722772 - in /activemq/camel/branches/camel-1.x: ./
camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
Author: ningjiang
Date: Tue Dec 2 20:51:05 2008
New Revision: 722772
URL: http://svn.apache.org/viewvc?rev=722772&view=rev
Log:
Merged revisions 722726 via svnmerge from
https://svn.apache.org/repos/asf/activemq/camel/trunk
........
r722726 | ningjiang | 2008-12-03 10:54:15 +0800 (Wed, 03 Dec 2008) | 1 line
CAMEL-1129 Enhance ErrorHandler RedeliveryPolicy with a Timer manager to avoid locking current thread while sleeping
........
Modified:
activemq/camel/branches/camel-1.x/ (props changed)
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 2 20:51:05 2008
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=722772&r1=722771&r2=722772&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Dec 2 20:51:05 2008
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.RejectedExecutionException;
import org.apache.camel.AsyncCallback;
@@ -50,6 +52,7 @@
private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
+ private static Timer timer = new Timer();
private Processor output;
private Processor deadLetter;
private AsyncProcessor outputAsync;
@@ -66,6 +69,40 @@
RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
Processor failureProcessor = deadLetter;
}
+
+ private class RedeliverTimerTask extends TimerTask {
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final RedeliveryData data;
+
+ public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.data = data;
+ }
+
+ @Override
+ public void run() {
+ //only handle the real AsyncProcess the exchange
+ outputAsync.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ // Only handle the async case...
+ if (sync) {
+ return;
+ }
+ data.sync = false;
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ if (exchange.getException() != null && !isFailureHandled(exchange)) {
+ // if we are redelivering then sleep before trying again
+ asyncProcess(exchange, callback, data);
+ } else {
+ callback.done(sync);
+ }
+ }
+ });
+ }
+ }
public DeadLetterChannel(Processor output, Processor deadLetter) {
this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(),
@@ -118,45 +155,12 @@
// did previous processing caused an exception?
if (exchange.getException() != null) {
- Throwable e = exchange.getException();
- // set the original caused exception
- exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e);
-
- // find the error handler to use (if any)
- ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
- if (exceptionPolicy != null) {
- data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
- data.handledPredicate = exceptionPolicy.getHandledPolicy();
- Processor processor = exceptionPolicy.getErrorHandler();
- if (processor != null) {
- data.failureProcessor = processor;
- }
- }
-
- logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
- data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+ handleException(exchange, data);
}
// should we redeliver or not?
if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
- // we did not success with the redelivery so now we let the failure processor handle it
- setFailureHandled(exchange);
- // must decrement the redelivery counter as we didn't process the redelivery but is
- // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
- decrementRedeliveryCounter(exchange);
-
- AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
- boolean sync = afp.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- restoreExceptionOnExchange(exchange, data.handledPredicate);
- callback.done(data.sync);
- }
- });
-
- // The line below shouldn't be needed, it is invoked by the AsyncCallback above
- //restoreExceptionOnExchange(exchange, data.handledPredicate);
- logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor, data, null);
- return sync;
+ return deliverToFaultProcessor(exchange, callback, data);
}
// should we redeliver
@@ -180,8 +184,9 @@
data.sync = false;
// only process if the exchange hasn't failed
// and it has not been handled by the error processor
- if (exchange.getException() != null && !isFailureHandled(exchange)) {
- process(exchange, callback, data);
+ if (exchange.getException() != null && !isFailureHandled(exchange)) {
+ //TODO Call the Timer for the asyncProcessor
+ asyncProcess(exchange, callback, data);
} else {
callback.done(sync);
}
@@ -215,6 +220,91 @@
}
}
+ protected void asyncProcess(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+ // set the timer here
+ if (!isRunAllowed()) {
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(data.sync);
+ return;
+ }
+
+ // if the exchange is transacted then let the underlying system handle the redelivery etc.
+ // this DeadLetterChannel is only for non transacted exchanges
+ if (exchange.isTransacted() && exchange.getException() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange);
+ }
+ return;
+ }
+
+ // did previous processing caused an exception?
+ if (exchange.getException() != null) {
+ handleException(exchange, data);
+ }
+
+ if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+ deliverToFaultProcessor(exchange, callback, data);
+ return;
+ }
+
+ // process the next try
+ // if we are redelivering then sleep before trying again
+ if (data.redeliveryCounter > 0) {
+ // okay we will give it another go so clear the exception so we can try again
+ if (exchange.getException() != null) {
+ exchange.setException(null);
+ }
+ // wait until we should redeliver
+ data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
+ timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
+ }
+ }
+
+ private void handleException(Exchange exchange, RedeliveryData data) {
+ Throwable e = exchange.getException();
+ // set the original caused exception
+ exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e);
+
+ // find the error handler to use (if any)
+ ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
+ if (exceptionPolicy != null) {
+ data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
+ data.handledPredicate = exceptionPolicy.getHandledPolicy();
+ Processor processor = exceptionPolicy.getErrorHandler();
+ if (processor != null) {
+ data.failureProcessor = processor;
+ }
+ }
+
+ logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
+ data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+
+ }
+
+ private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,
+ final RedeliveryData data) {
+ // we did not success with the redelivery so now we let the failure processor handle it
+ setFailureHandled(exchange);
+ // must decrement the redelivery counter as we didn't process the redelivery but is
+ // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
+ decrementRedeliveryCounter(exchange);
+
+ AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
+ boolean sync = afp.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ restoreExceptionOnExchange(exchange, data.handledPredicate);
+ callback.done(data.sync);
+ }
+ });
+
+ // The line below shouldn't be needed, it is invoked by the AsyncCallback above
+ //restoreExceptionOnExchange(exchange, data.handledPredicate);
+ logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor, data, null);
+ return sync;
+ }
+
public static boolean isFailureHandled(Exchange exchange) {
return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
|| exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null;
@@ -331,5 +421,6 @@
@Override
protected void doStop() throws Exception {
ServiceHelper.stopServices(deadLetter, output);
- }
+ }
+
}
Modified: activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=722772&r1=722771&r2=722772&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Tue Dec 2 20:51:05 2008
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -76,7 +78,7 @@
}
protected RouteBuilder createRouteBuilder() {
- final Processor processor = new Processor() {
+ final Processor processor = new AsyncProcessor() {
public void process(Exchange exchange) {
Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
Integer.class);
@@ -86,13 +88,32 @@
+ " being less than: " + failUntilAttempt);
}
}
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
+ Integer.class);
+ int attempt = (counter == null) ? 1 : counter + 1;
+ if (attempt > 1) {
+ assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(), "Timer-0");
+ }
+
+ if (attempt < failUntilAttempt) {
+ // we can't throw the exception here , or the callback will not be invoked.
+ exchange.setException(new RuntimeException("Failed to process due to attempt: " + attempt
+ + " being less than: " + failUntilAttempt));
+ }
+ callback.done(false);
+ return false;
+ }
};
return new RouteBuilder() {
public void configure() {
from("direct:start").errorHandler(
deadLetterChannel("mock:failed").maximumRedeliveries(2)
- .initialRedeliveryDelay(1)
+
+ .initialRedeliveryDelay(1000)
+
.loggingLevel(LoggingLevel.DEBUG)
).process(processor).to("mock:success");