You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/15 08:18:47 UTC
svn commit: r784665 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/test/java/org/apache/camel/processor/
components/camel-spring/src/main/java/org/apache/camel/spring/spi/
Author: davsclaus
Date: Mon Jun 15 06:18:47 2009
New Revision: 784665
URL: http://svn.apache.org/viewvc?rev=784665&view=rev
Log:
CAMEL-1706: DefaultErrorHandler is now as powerful as DLC.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Mon Jun 15 06:18:47 2009
@@ -18,6 +18,7 @@
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
/**
@@ -50,6 +51,15 @@
setExceptionPolicy(exceptionPolicyStrategy);
}
+ public boolean supportDeadLetterQueue() {
+ return true;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ // just to let the stacktrace reveal that this is a dead letter channel
+ super.process(exchange);
+ }
+
@Override
public String toString() {
return "DeadLetterChannel[" + output + ", " + (deadLetterUri != null ? deadLetterUri : deadLetter) + "]";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java Mon Jun 15 06:18:47 2009
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
@@ -43,6 +44,15 @@
setExceptionPolicy(exceptionPolicyStrategy);
}
+ public boolean supportDeadLetterQueue() {
+ return false;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ // just to let the stacktrace reveal that this is a dead letter channel
+ super.process(exchange);
+ }
+
@Override
public String toString() {
return "DefaultErrorHandler[" + output + "]";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Jun 15 06:18:47 2009
@@ -42,12 +42,6 @@
// TODO: support onException being able to use other onException to route they exceptions
// (hard one to get working, has not been supported before)
- // TODO: the while loop method should be refactored a bit so its more higher level so the
- // code is easier to read and understand
-
- // TODO: add support for onRedeliver(SocketException.class) to allow running som custom
- // route when this given exception is being redelivered (create ticket and add in 2.1)
-
protected final Processor deadLetter;
protected final String deadLetterUri;
protected final Processor output;
@@ -64,7 +58,8 @@
// default behavior which can be overloaded on a per exception basis
RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
- Processor failureProcessor = deadLetter;
+ Processor deadLetterProcessor = deadLetter;
+ Processor failureProcessor;
Processor onRedeliveryProcessor = redeliveryProcessor;
Predicate handledPredicate = handledPolicy;
boolean useOriginalInBody = useOriginalBodyPolicy;
@@ -87,6 +82,11 @@
return false;
}
+ /**
+ * Whether this error handler supports dead letter queue or not
+ */
+ public abstract boolean supportDeadLetterQueue();
+
public void process(Exchange exchange) throws Exception {
processErrorHandler(exchange, new RedeliveryData());
}
@@ -117,7 +117,7 @@
return;
}
- // did previous processing caused an exception?
+ // did previous processing cause an exception?
if (exchange.getException() != null) {
handleException(exchange, data);
}
@@ -125,17 +125,19 @@
// compute if we should redeliver or not
boolean shouldRedeliver = shouldRedeliver(exchange, data);
if (!shouldRedeliver) {
- // TODO: divde into onException and deadLetterQueue
- // no then move it to the dead letter queue
- deliverToFailureProcessor(exchange, data);
- // prepare the exchange for failure
+ // no we should not redeliver to the same output so either try an onException (if any given)
+ // or the dead letter queue
+ Processor target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
+ // deliver to the failure processor (either an on exception or dead letter queue
+ deliverToFailureProcessor(target, exchange, data);
+ // prepare the exchange for failure before returning
prepareExchangeAfterFailure(exchange, data);
- // we could not process the exchange succesfully so break
+ // and then return
return;
}
// if we are redelivering then sleep before trying again
- if (data.redeliveryCounter > 0) {
+ if (shouldRedeliver && data.redeliveryCounter > 0) {
prepareExchangeForRedelivery(exchange);
// wait until we should redeliver
@@ -151,15 +153,15 @@
deliverToRedeliveryProcessor(exchange, data);
}
- // process the exchange
+ // process the exchange (also redelivery)
try {
output.process(exchange);
} catch (Exception e) {
exchange.setException(e);
}
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
+ // only done if the exchange hasn't failed
+ // and it has not been handled by the failure processor
boolean done = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange);
if (done) {
return;
@@ -263,7 +265,8 @@
/**
* All redelivery attempts failed so move the exchange to the dead letter queue
*/
- protected void deliverToFailureProcessor(final Exchange exchange, final RedeliveryData data) {
+ protected void deliverToFailureProcessor(final Processor processor, final Exchange exchange,
+ final RedeliveryData data) {
// we did not success with the redelivery so now we let the failure processor handle it
ExchangeHelper.setFailureHandled(exchange);
// must decrement the redelivery counter as we didn't process the redelivery but is
@@ -272,11 +275,11 @@
// reset cached streams so they can be read again
MessageHelper.resetStreamCache(exchange.getIn());
- if (data.failureProcessor != null) {
+ if (processor != null) {
// prepare original IN body if it should be moved instead of current body
if (data.useOriginalInBody) {
if (log.isTraceEnabled()) {
- log.trace("Using the original IN body in the DedLetterQueue instead of the current IN body");
+ log.trace("Using the original IN body instead of the current IN body");
}
Object original = exchange.getUnitOfWork().getOriginalInBody();
@@ -284,17 +287,17 @@
}
if (log.isTraceEnabled()) {
- log.trace("Failure processor " + data.failureProcessor + " is processing Exchange: " + exchange);
+ log.trace("Failure processor " + processor + " is processing Exchange: " + exchange);
}
try {
- data.failureProcessor.process(exchange);
+ processor.process(exchange);
} catch (Exception e) {
exchange.setException(e);
}
log.trace("Failure processor done");
String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
- + ". Processed by failure processor: " + data.failureProcessor;
+ + ". Processed by failure processor: " + processor;
logFailedDelivery(false, exchange, msg, data, null);
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java Mon Jun 15 06:18:47 2009
@@ -95,7 +95,7 @@
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:start")
- .errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2))
+ .errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliverDelay(0))
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
counter++;
@@ -113,7 +113,7 @@
});
from("direct:one")
- .errorHandler(deadLetterChannel("mock:one").maximumRedeliveries(1))
+ .errorHandler(deadLetterChannel("mock:one").maximumRedeliveries(1).redeliverDelay(0))
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
counter++;
Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java?rev=784665&r1=784664&r2=784665&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java Mon Jun 15 06:18:47 2009
@@ -41,6 +41,8 @@
*/
public class TransactionErrorHandler extends ErrorHandlerSupport {
+ // TODO: extend RedeliverErrorHandler
+
private static final transient Log LOG = LogFactory.getLog(TransactionErrorHandler.class);
private final TransactionTemplate transactionTemplate;
private Processor output;