You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/15 06:55:50 UTC
svn commit: r784652 [1/2] - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/component/bean/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/pro...
Author: davsclaus
Date: Mon Jun 15 04:55:49 2009
New Revision: 784652
URL: http://svn.apache.org/viewvc?rev=784652&view=rev
Log:
CAMEL-1706: DefaultErrorHandler is now as powerful as DLC.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerRedeliveryTest.java
- copied, changed from r784508, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionProcessorInspectCausedExceptionWithDefaultErrorHandlerTest.java
- copied, changed from r784546, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionProcessorInspectCausedExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilWithDefaultErrorHandlerTest.java
- copied, changed from r784546, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRouteWithDefaultErrorHandlerTest.java
- copied, changed from r784546, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRouteTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionSubRouteWithDefaultErrorHandlerTest.java
- copied, changed from r784546, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionSubRouteTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDefaultErrorHandlerNotHandledPolicyTest.java
- copied, changed from r784546, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDeadLetterChannelNotHandledPolicyTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringDefaultErrorHandlerNotHandledPolicyTest.xml
- copied, changed from r784546, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringDeadLetterChannelNotHandledPolicyTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithRetryLoggingLevelSetTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInfoSelectMethodTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/DefaultParameterMappingStrategyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelUseOriginalInBodyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/CustomExceptionPolicyStrategyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionProcessorInspectCausedExceptionTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsUseOriginalBodyTest.java
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/onexception/onExceptionSubRouteWithDefaultErrorHandlerTest.xml
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java Mon Jun 15 04:55:49 2009
@@ -330,16 +330,6 @@
* <a href="http://camel.apache.org/dead-letter-channel.html">Dead Letter Channel EIP:</a>
* is a error handler for handling messages that could not be delivered to it's intented destination.
*
- * @return the builder
- */
- public DeadLetterChannelBuilder deadLetterChannel() {
- return new DeadLetterChannelBuilder();
- }
-
- /**
- * <a href="http://camel.apache.org/dead-letter-channel.html">Dead Letter Channel EIP:</a>
- * is a error handler for handling messages that could not be delivered to it's intented destination.
- *
* @param deadLetterUri uri to the dead letter endpoint storing dead messages
* @return the builder
*/
@@ -360,6 +350,7 @@
// Properties
// -------------------------------------------------------------------------
+
public CamelContext getContext() {
return context;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Mon Jun 15 04:55:49 2009
@@ -23,16 +23,12 @@
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.processor.DeadLetterChannel;
-import org.apache.camel.processor.ErrorHandlerSupport;
import org.apache.camel.processor.Logger;
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.processor.SendProcessor;
-import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
import org.apache.camel.spi.RouteContext;
-import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import static org.apache.camel.builder.PredicateBuilder.toPredicate;
/**
* A builder of a <a
@@ -41,46 +37,24 @@
*
* @version $Revision$
*/
-public class DeadLetterChannelBuilder extends ErrorHandlerBuilderSupport {
- private static final boolean HANDLED = true;
- private Logger logger = new Logger(LogFactory.getLog(DeadLetterChannel.class), LoggingLevel.ERROR);
- private ExceptionPolicyStrategy exceptionPolicyStrategy = ErrorHandlerSupport.createDefaultExceptionPolicyStrategy();
- private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
- private Processor onRedelivery;
- private Processor failureProcessor;
- private Endpoint deadLetter;
- private String deadLetterUri;
- private Predicate handledPolicy;
- private boolean useOriginalBody;
-
- /**
- * Creates a default DeadLetterChannel with a default endpoint
- */
+public class DeadLetterChannelBuilder extends DefaultErrorHandlerBuilder {
+
public DeadLetterChannelBuilder() {
- this("log:org.apache.camel.DeadLetterChannel?level=error");
+ // no-arg constructor used by Spring DSL
}
- /**
- * Creates a DeadLetterChannel using the given endpoint
- *
- * @param deadLetter the dead letter queue
- */
public DeadLetterChannelBuilder(Endpoint deadLetter) {
setDeadLetter(deadLetter);
}
- /**
- * Creates a DeadLetterChannel using the given endpoint
- *
- * @param uri the dead letter queue
- */
public DeadLetterChannelBuilder(String uri) {
setDeadLetterUri(uri);
}
public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
- DeadLetterChannel answer = new DeadLetterChannel(processor, getFailureProcessor(), deadLetterUri, onRedelivery,
- getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy(), getHandledPolicy(), isUseOriginalBody());
+ DeadLetterChannel answer = new DeadLetterChannel(processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(),
+ getHandledPolicy(), getExceptionPolicyStrategy(), getFailureProcessor(), getDeadLetterUri(),
+ isUseOriginalBody());
// must enable stream cache as DeadLetterChannel can do redeliveries and
// thus it needs to be able to read the stream again
configure(answer);
@@ -91,196 +65,6 @@
return false;
}
- // Builder methods
- // -------------------------------------------------------------------------
- public DeadLetterChannelBuilder backOffMultiplier(double backOffMultiplier) {
- getRedeliveryPolicy().backOffMultiplier(backOffMultiplier);
- return this;
- }
-
- public DeadLetterChannelBuilder collisionAvoidancePercent(short collisionAvoidancePercent) {
- getRedeliveryPolicy().collisionAvoidancePercent(collisionAvoidancePercent);
- return this;
- }
-
- public DeadLetterChannelBuilder redeliverDelay(long delay) {
- getRedeliveryPolicy().redeliverDelay(delay);
- return this;
- }
-
- public DeadLetterChannelBuilder delayPattern(String delayPattern) {
- getRedeliveryPolicy().delayPattern(delayPattern);
- return this;
- }
-
- public DeadLetterChannelBuilder maximumRedeliveries(int maximumRedeliveries) {
- getRedeliveryPolicy().maximumRedeliveries(maximumRedeliveries);
- return this;
- }
-
- public DeadLetterChannelBuilder disableRedelivery() {
- getRedeliveryPolicy().maximumRedeliveries(0);
- return this;
- }
-
- public DeadLetterChannelBuilder maximumRedeliveryDelay(long maximumRedeliveryDelay) {
- getRedeliveryPolicy().maximumRedeliveryDelay(maximumRedeliveryDelay);
- return this;
- }
-
- public DeadLetterChannelBuilder useCollisionAvoidance() {
- getRedeliveryPolicy().useCollisionAvoidance();
- return this;
- }
-
- public DeadLetterChannelBuilder useExponentialBackOff() {
- getRedeliveryPolicy().useExponentialBackOff();
- return this;
- }
-
- public DeadLetterChannelBuilder retriesExhaustedLogLevel(LoggingLevel retriesExhaustedLogLevel) {
- getRedeliveryPolicy().setRetriesExhaustedLogLevel(retriesExhaustedLogLevel);
- return this;
- }
-
- public DeadLetterChannelBuilder retryAttemptedLogLevel(LoggingLevel retryAttemptedLogLevel) {
- getRedeliveryPolicy().setRetryAttemptedLogLevel(retryAttemptedLogLevel);
- return this;
- }
-
- public DeadLetterChannelBuilder logStackTrace(boolean logStackTrace) {
- getRedeliveryPolicy().setLogStackTrace(logStackTrace);
- return this;
- }
-
- /**
- * Sets whether the exchange should be marked as handled or not.
- *
- * @param handled handled or not
- * @return the builder
- */
- public DeadLetterChannelBuilder handled(boolean handled) {
- Expression expression = ExpressionBuilder.constantExpression(Boolean.toString(handled));
- return handled(expression);
- }
-
- /**
- * Sets whether the exchange should be marked as handled or not.
- *
- * @param handled predicate that determines true or false
- * @return the builder
- */
- public DeadLetterChannelBuilder handled(Predicate handled) {
- this.setHandledPolicy(handled);
- return this;
- }
-
- /**
- * Sets whether the exchange should be marked as handled or not.
- *
- * @param handled expression that determines true or false
- * @return the builder
- */
- public DeadLetterChannelBuilder handled(Expression handled) {
- this.setHandledPolicy(toPredicate(handled));
- return this;
- }
-
- /**
- * Sets the logger used for caught exceptions
- *
- * @param logger the logger
- * @return the builder
- */
- public DeadLetterChannelBuilder logger(Logger logger) {
- setLogger(logger);
- return this;
- }
-
- /**
- * Sets the logging level of exceptions caught
- *
- * @param level the logging level
- * @return the builder
- */
- public DeadLetterChannelBuilder loggingLevel(LoggingLevel level) {
- getLogger().setLevel(level);
- return this;
- }
-
- /**
- * Sets the log used for caught exceptions
- *
- * @param log the logger
- * @return the builder
- */
- public DeadLetterChannelBuilder log(Log log) {
- getLogger().setLog(log);
- return this;
- }
-
- /**
- * Sets the log used for caught exceptions
- *
- * @param log the log name
- * @return the builder
- */
- public DeadLetterChannelBuilder log(String log) {
- return log(LogFactory.getLog(log));
- }
-
- /**
- * Sets the log used for caught exceptions
- *
- * @param log the log class
- * @return the builder
- */
- public DeadLetterChannelBuilder log(Class log) {
- return log(LogFactory.getLog(log));
- }
-
- /**
- * Sets the exception policy to use
- *
- * @return the builder
- */
- public DeadLetterChannelBuilder exceptionPolicyStrategy(ExceptionPolicyStrategy exceptionPolicyStrategy) {
- setExceptionPolicyStrategy(exceptionPolicyStrategy);
- return this;
- }
-
- /**
- * Sets a processor that should be processed <b>before</b> a redelivey attempt.
- * <p/>
- * Can be used to change the {@link org.apache.camel.Exchange} <b>before</b> its being redelivered.
- *
- * @return the builder
- */
- public DeadLetterChannelBuilder onRedelivery(Processor processor) {
- setOnRedelivery(processor);
- return this;
- }
-
- /**
- * Will use the original input body when an {@link Exchange} is moved to the dead letter queue.
- * <p/>
- * <b>Notice:</b> this only applies when all redeliveries attempt have failed and the {@link Exchange} is doomed for failure.
- * <br/>
- * Instead of using the current inprogress {@link Exchange} IN body we use the original IN body instead. This allows
- * you to store the original input in the dead letter queue instead of the inprogress snapshot of the IN body.
- * For instance if you route transform the IN body during routing and then failed. With the original exchange
- * store in the dead letter queue it might be easier to manually re submit the {@link Exchange} again as the IN body
- * is the same as when Camel received it. So you should be able to send the {@link Exchange} to the same input.
- * <p/>
- * By default this feature is off.
- *
- * @return the builder
- */
- public DeadLetterChannelBuilder useOriginalBody() {
- setUseOriginalBody(true);
- return this;
- }
-
// Properties
// -------------------------------------------------------------------------
@@ -304,95 +88,18 @@
return failureProcessor;
}
- public void setFailureProcessor(Processor failureProcessor) {
- this.failureProcessor = failureProcessor;
- }
-
- public String getDeadLetterUri() {
- return deadLetterUri;
- }
-
- public void setDeadLetterUri(String deadLetterUri) {
- this.deadLetter = null;
- this.deadLetterUri = deadLetterUri;
- }
-
- public Endpoint getDeadLetter() {
- return deadLetter;
- }
-
- public void setDeadLetter(Endpoint deadLetter) {
- this.deadLetter = deadLetter;
- this.deadLetterUri = deadLetter.getEndpointUri();
- }
-
- public RedeliveryPolicy getRedeliveryPolicy() {
- return redeliveryPolicy;
- }
-
- /**
- * Sets the redelivery policy
- */
- public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
- this.redeliveryPolicy = redeliveryPolicy;
- }
-
- public Logger getLogger() {
- return logger;
+ protected Predicate createHandledPolicy() {
+ // should be handled by default for dead letter channel
+ return PredicateBuilder.toPredicate(ExpressionBuilder.constantExpression(true));
}
- public void setLogger(Logger logger) {
- this.logger = logger;
- }
-
- /**
- * Sets the exception policy strategy to use for resolving the {@link org.apache.camel.model.OnExceptionDefinition}
- * to use for a given thrown exception
- */
- public ExceptionPolicyStrategy getExceptionPolicyStrategy() {
- return exceptionPolicyStrategy;
- }
-
- public void setExceptionPolicyStrategy(ExceptionPolicyStrategy exceptionPolicyStrategy) {
- this.exceptionPolicyStrategy = exceptionPolicyStrategy;
- }
-
- public Processor getOnRedelivery() {
- return onRedelivery;
- }
-
- public void setOnRedelivery(Processor onRedelivery) {
- this.onRedelivery = onRedelivery;
- }
-
- public Predicate getHandledPolicy() {
- if (handledPolicy == null) {
- createHandledPolicy();
- }
- return handledPolicy;
- }
-
- public void setHandledPolicy(Predicate handled) {
- this.handledPolicy = handled;
- }
-
- /**
- * Sets the handled using a boolean and thus easier to use for Spring XML configuration as well
- */
- public void setHandled(boolean handled) {
- handled(handled);
- }
-
- public boolean isUseOriginalBody() {
- return useOriginalBody;
- }
-
- public void setUseOriginalBody(boolean useOriginalBody) {
- this.useOriginalBody = useOriginalBody;
+ @Override
+ protected RedeliveryPolicy createRedeliveryPolicy() {
+ return new RedeliveryPolicy();
}
- protected void createHandledPolicy() {
- handledPolicy = PredicateBuilder.toPredicate(ExpressionBuilder.constantExpression(HANDLED));
+ protected Logger createLogger() {
+ return new Logger(LogFactory.getLog(DeadLetterChannel.class), LoggingLevel.ERROR);
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java Mon Jun 15 04:55:49 2009
@@ -16,9 +16,20 @@
*/
package org.apache.camel.builder;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.processor.DefaultErrorHandler;
+import org.apache.camel.processor.ErrorHandlerSupport;
+import org.apache.camel.processor.Logger;
+import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
import org.apache.camel.spi.RouteContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.camel.builder.PredicateBuilder.toPredicate;
/**
* The default error handler builder.
@@ -27,14 +38,345 @@
*/
public class DefaultErrorHandlerBuilder extends ErrorHandlerBuilderSupport {
- public boolean supportTransacted() {
- return false;
+ protected Logger logger;
+ protected ExceptionPolicyStrategy exceptionPolicyStrategy = ErrorHandlerSupport.createDefaultExceptionPolicyStrategy();
+ protected RedeliveryPolicy redeliveryPolicy;
+ protected Processor onRedelivery;
+ protected Predicate handledPolicy;
+ protected Processor failureProcessor;
+ protected Endpoint deadLetter;
+ protected String deadLetterUri;
+ protected boolean useOriginalBody;
+
+ public DefaultErrorHandlerBuilder() {
}
- public Processor createErrorHandler(RouteContext routeContext, Processor processor) {
- DefaultErrorHandler answer = new DefaultErrorHandler(processor, getExceptionPolicyStrategy());
+ public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
+ DefaultErrorHandler answer = new DefaultErrorHandler(processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(),
+ getHandledPolicy(), getExceptionPolicyStrategy());
+ // must enable stream cache as DeadLetterChannel can do redeliveries and
+ // thus it needs to be able to read the stream again
configure(answer);
return answer;
}
+ public boolean supportTransacted() {
+ return false;
+ }
+
+
+ // Builder methods
+ // -------------------------------------------------------------------------
+ public DefaultErrorHandlerBuilder backOffMultiplier(double backOffMultiplier) {
+ getRedeliveryPolicy().backOffMultiplier(backOffMultiplier);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder collisionAvoidancePercent(short collisionAvoidancePercent) {
+ getRedeliveryPolicy().collisionAvoidancePercent(collisionAvoidancePercent);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder redeliverDelay(long delay) {
+ getRedeliveryPolicy().redeliverDelay(delay);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder delayPattern(String delayPattern) {
+ getRedeliveryPolicy().delayPattern(delayPattern);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder maximumRedeliveries(int maximumRedeliveries) {
+ getRedeliveryPolicy().maximumRedeliveries(maximumRedeliveries);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder disableRedelivery() {
+ getRedeliveryPolicy().maximumRedeliveries(0);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder maximumRedeliveryDelay(long maximumRedeliveryDelay) {
+ getRedeliveryPolicy().maximumRedeliveryDelay(maximumRedeliveryDelay);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder useCollisionAvoidance() {
+ getRedeliveryPolicy().useCollisionAvoidance();
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder useExponentialBackOff() {
+ getRedeliveryPolicy().useExponentialBackOff();
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder retriesExhaustedLogLevel(LoggingLevel retriesExhaustedLogLevel) {
+ getRedeliveryPolicy().setRetriesExhaustedLogLevel(retriesExhaustedLogLevel);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder retryAttemptedLogLevel(LoggingLevel retryAttemptedLogLevel) {
+ getRedeliveryPolicy().setRetryAttemptedLogLevel(retryAttemptedLogLevel);
+ return this;
+ }
+
+ public DefaultErrorHandlerBuilder logStackTrace(boolean logStackTrace) {
+ getRedeliveryPolicy().setLogStackTrace(logStackTrace);
+ return this;
+ }
+
+ /**
+ * Sets whether the exchange should be marked as handled or not.
+ *
+ * @param handled handled or not
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder handled(boolean handled) {
+ Expression expression = ExpressionBuilder.constantExpression(Boolean.toString(handled));
+ return handled(expression);
+ }
+
+ /**
+ * Sets whether the exchange should be marked as handled or not.
+ *
+ * @param handled predicate that determines true or false
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder handled(Predicate handled) {
+ this.setHandledPolicy(handled);
+ return this;
+ }
+
+ /**
+ * Sets whether the exchange should be marked as handled or not.
+ *
+ * @param handled expression that determines true or false
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder handled(Expression handled) {
+ this.setHandledPolicy(toPredicate(handled));
+ return this;
+ }
+
+ /**
+ * Sets the logger used for caught exceptions
+ *
+ * @param logger the logger
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder logger(Logger logger) {
+ setLogger(logger);
+ return this;
+ }
+
+ /**
+ * Sets the logging level of exceptions caught
+ *
+ * @param level the logging level
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder loggingLevel(LoggingLevel level) {
+ getLogger().setLevel(level);
+ return this;
+ }
+
+ /**
+ * Sets the log used for caught exceptions
+ *
+ * @param log the logger
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder log(Log log) {
+ getLogger().setLog(log);
+ return this;
+ }
+
+ /**
+ * Sets the log used for caught exceptions
+ *
+ * @param log the log name
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder log(String log) {
+ return log(LogFactory.getLog(log));
+ }
+
+ /**
+ * Sets the log used for caught exceptions
+ *
+ * @param log the log class
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder log(Class log) {
+ return log(LogFactory.getLog(log));
+ }
+
+ /**
+ * Sets the exception policy to use
+ *
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder exceptionPolicyStrategy(ExceptionPolicyStrategy exceptionPolicyStrategy) {
+ setExceptionPolicyStrategy(exceptionPolicyStrategy);
+ return this;
+ }
+
+ /**
+ * Sets a processor that should be processed <b>before</b> a redelivey attempt.
+ * <p/>
+ * Can be used to change the {@link org.apache.camel.Exchange} <b>before</b> its being redelivered.
+ *
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder onRedelivery(Processor processor) {
+ setOnRedelivery(processor);
+ return this;
+ }
+
+ /**
+ * Will use the original input body when an {@link org.apache.camel.Exchange} is moved to the dead letter queue.
+ * <p/>
+ * <b>Notice:</b> this only applies when all redeliveries attempt have failed and the {@link org.apache.camel.Exchange} is doomed for failure.
+ * <br/>
+ * Instead of using the current inprogress {@link org.apache.camel.Exchange} IN body we use the original IN body instead. This allows
+ * you to store the original input in the dead letter queue instead of the inprogress snapshot of the IN body.
+ * For instance if you route transform the IN body during routing and then failed. With the original exchange
+ * store in the dead letter queue it might be easier to manually re submit the {@link org.apache.camel.Exchange} again as the IN body
+ * is the same as when Camel received it. So you should be able to send the {@link org.apache.camel.Exchange} to the same input.
+ * <p/>
+ * By default this feature is off.
+ *
+ * @return the builder
+ */
+ public DefaultErrorHandlerBuilder useOriginalBody() {
+ setUseOriginalBody(true);
+ return this;
+ }
+
+ // Properties
+ // -------------------------------------------------------------------------
+
+ public Processor getFailureProcessor() {
+ return failureProcessor;
+ }
+
+ public void setFailureProcessor(Processor failureProcessor) {
+ this.failureProcessor = failureProcessor;
+ }
+
+ public RedeliveryPolicy getRedeliveryPolicy() {
+ if (redeliveryPolicy == null) {
+ redeliveryPolicy = createRedeliveryPolicy();
+ }
+ return redeliveryPolicy;
+ }
+
+ /**
+ * Sets the redelivery policy
+ */
+ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
+
+ public Logger getLogger() {
+ if (logger == null) {
+ logger = createLogger();
+ }
+ return logger;
+ }
+
+ public void setLogger(Logger logger) {
+ this.logger = logger;
+ }
+
+ /**
+ * Sets the exception policy strategy to use for resolving the {@link org.apache.camel.model.OnExceptionDefinition}
+ * to use for a given thrown exception
+ */
+ public ExceptionPolicyStrategy getExceptionPolicyStrategy() {
+ return exceptionPolicyStrategy;
+ }
+
+ public void setExceptionPolicyStrategy(ExceptionPolicyStrategy exceptionPolicyStrategy) {
+ this.exceptionPolicyStrategy = exceptionPolicyStrategy;
+ }
+
+ public Processor getOnRedelivery() {
+ return onRedelivery;
+ }
+
+ public void setOnRedelivery(Processor onRedelivery) {
+ this.onRedelivery = onRedelivery;
+ }
+
+ public Predicate getHandledPolicy() {
+ if (handledPolicy == null) {
+ handledPolicy = createHandledPolicy();
+ }
+ return handledPolicy;
+ }
+
+ public void setHandledPolicy(Predicate handled) {
+ this.handledPolicy = handled;
+ }
+
+ /**
+ * Sets the handled using a boolean and thus easier to use for Spring XML configuration as well
+ */
+ public void setHandled(boolean handled) {
+ handled(handled);
+ }
+
+ public String getDeadLetterUri() {
+ return deadLetterUri;
+ }
+
+ public void setDeadLetterUri(String deadLetterUri) {
+ this.deadLetter = null;
+ this.deadLetterUri = deadLetterUri;
+ }
+
+ public Endpoint getDeadLetter() {
+ return deadLetter;
+ }
+
+ public void setDeadLetter(Endpoint deadLetter) {
+ this.deadLetter = deadLetter;
+ this.deadLetterUri = deadLetter.getEndpointUri();
+ }
+
+ public boolean isUseOriginalBody() {
+ return useOriginalBody;
+ }
+
+ public void setUseOriginalBody(boolean useOriginalBody) {
+ this.useOriginalBody = useOriginalBody;
+ }
+
+ protected Predicate createHandledPolicy() {
+ // should NOT be handled by default for default error handler
+ return PredicateBuilder.toPredicate(ExpressionBuilder.constantExpression(false));
+ }
+
+ protected RedeliveryPolicy createRedeliveryPolicy() {
+ RedeliveryPolicy policy = new RedeliveryPolicy();
+ policy.disableRedelivery();
+ policy.setRedeliverDelay(0);
+ policy.setLogStackTrace(false);
+ policy.setRetriesExhaustedLogLevel(LoggingLevel.OFF);
+ policy.setRetryAttemptedLogLevel(LoggingLevel.OFF);
+ return policy;
+ }
+
+ protected Logger createLogger() {
+ return new Logger(LogFactory.getLog(DefaultErrorHandler.class), LoggingLevel.ERROR);
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultErrorHandlerBuilder";
+ }
+
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java Mon Jun 15 04:55:49 2009
@@ -106,7 +106,13 @@
if (invocation == null) {
throw new IllegalStateException(
"No method invocation could be created, no maching method could be found on: " + bean);
+ } else {
+ // set method name if not explict given
+ if (method == null) {
+ method = invocation.getMethod().getName();
+ }
}
+
try {
Object value = invocation.proceed();
if (value != null) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Mon Jun 15 04:55:49 2009
@@ -16,18 +16,9 @@
*/
package org.apache.camel.processor;
-import java.util.concurrent.RejectedExecutionException;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.LoggingLevel;
-import org.apache.camel.Message;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.MessageHelper;
-import org.apache.camel.util.ServiceHelper;
/**
* Implements a <a
@@ -37,63 +28,25 @@
*
* @version $Revision$
*/
-public class DeadLetterChannel extends ErrorHandlerSupport implements Processor {
-
- // TODO: Introduce option to allow async redelivery, eg to not block thread while delay
- // (eg the Timer task code). However we should consider using Channels that has internal
- // producer/consumer queues with "delayed" support so a redelivery is just to move an
- // exchange to this channel with the computed delay time
- // we need to provide option so end users can decide if they would like to spawn an async thread
- // or not. Also consider MEP as InOut does not work with async then as the original caller thread
- // is expecting a reply in the sync thread.
-
- // we can use a single shared static timer for async redeliveries
- private final Processor deadLetter;
- private final String deadLetterUri;
- private final Processor output;
- private final Processor redeliveryProcessor;
- private final RedeliveryPolicy redeliveryPolicy;
- private final Predicate handledPolicy;
- private final Logger logger;
- private final boolean useOriginalBodyPolicy;
+public class DeadLetterChannel extends RedeliveryErrorHandler {
- private class RedeliveryData {
- int redeliveryCounter;
- long redeliveryDelay;
- Predicate retryUntilPredicate;
-
- // default behavior which can be overloaded on a per exception basis
- RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
- Processor deadLetterQueue = deadLetter;
- Processor onRedeliveryProcessor = redeliveryProcessor;
- Predicate handledPredicate = handledPolicy;
- boolean useOriginalInBody = useOriginalBodyPolicy;
- }
-
/**
* Creates the dead letter channel.
*
* @param output outer processor that should use this dead letter channel
- * @param deadLetter the failure processor to send failed exchanges to
- * @param deadLetterUri an optional uri for logging purpose
- * @param redeliveryProcessor an optional processor to run before redelivert attempt
- * @param redeliveryPolicy policy for redelivery
* @param logger logger to use for logging failures and redelivery attempts
- * @param exceptionPolicyStrategy strategy for onException handling
+ * @param redeliveryProcessor an optional processor to run before redelivery attempt
+ * @param redeliveryPolicy policy for redelivery
* @param handledPolicy policy for handling failed exception that are moved to the dead letter queue
- * @param useOriginalBodyPolicy should the original IN body be moved to the dead letter queue or the current exchange IN body?
+ * @param exceptionPolicyStrategy strategy for onException handling
+ * @param deadLetter the failure processor to send failed exchanges to
+ * @param deadLetterUri an optional uri for logging purpose
+ * @param useOriginalBodyPolicy should the original IN body be moved to the dead letter queue or the current exchange IN body?
*/
- public DeadLetterChannel(Processor output, Processor deadLetter, String deadLetterUri, Processor redeliveryProcessor,
- RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy,
- Predicate handledPolicy, boolean useOriginalBodyPolicy) {
- this.output = output;
- this.deadLetter = deadLetter;
- this.deadLetterUri = deadLetterUri;
- this.redeliveryProcessor = redeliveryProcessor;
- this.redeliveryPolicy = redeliveryPolicy;
- this.logger = logger;
- this.handledPolicy = handledPolicy;
- this.useOriginalBodyPolicy = useOriginalBodyPolicy;
+ public DeadLetterChannel(Processor output, Logger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
+ Predicate handledPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy,
+ Processor deadLetter, String deadLetterUri, boolean useOriginalBodyPolicy) {
+ super(output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, deadLetter, deadLetterUri, useOriginalBodyPolicy);
setExceptionPolicy(exceptionPolicyStrategy);
}
@@ -102,316 +55,4 @@
return "DeadLetterChannel[" + output + ", " + (deadLetterUri != null ? deadLetterUri : deadLetter) + "]";
}
- public boolean supportTransacted() {
- return false;
- }
-
- public void process(Exchange exchange) throws Exception {
- processErrorHandler(exchange, new RedeliveryData());
- }
-
- /**
- * Processes the exchange decorated with this dead letter channel.
- */
- protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) {
-
- while (true) {
- // we can't keep retrying if the route is being shutdown.
- if (!isRunAllowed()) {
- if (log.isDebugEnabled()) {
- log.debug("Rejected execution as we are not started for exchange: " + exchange);
- }
- if (exchange.getException() == null) {
- exchange.setException(new RejectedExecutionException());
- return;
- }
- }
-
- // do not handle transacted exchanges that failed as this error handler does not support it
- if (exchange.isTransacted() && !supportTransacted() && exchange.getException() != null) {
- if (log.isDebugEnabled()) {
- log.debug("This error handler does not support transacted exchanges."
- + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
- }
- return;
- }
-
- // did previous processing caused an exception?
- if (exchange.getException() != null) {
- handleException(exchange, data);
- }
-
- // compute if we should redeliver or not
- boolean shouldRedeliver = shouldRedeliver(exchange, data);
- if (!shouldRedeliver) {
- // no then move it to the dead letter queue
- deliverToDeadLetterQueue(exchange, data);
- // and we are finished since the exchanged was moved to the dead letter queue
- return;
- }
-
- // if we are redelivering then sleep before trying again
- if (data.redeliveryCounter > 0) {
- prepareExchangeForRedelivery(exchange);
-
- // wait until we should redeliver
- try {
- data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter);
- } catch (InterruptedException e) {
- log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
- // continue from top
- continue;
- }
-
- // letting onRedeliver be executed
- deliverToRedeliveryProcessor(exchange, data);
- }
-
- // process the exchange
- try {
- output.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
-
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
- boolean done = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange);
- if (done) {
- return;
- }
- // error occurred so loop back around.....
- }
-
- }
-
- // Properties
- // -------------------------------------------------------------------------
-
- /**
- * Returns the output processor
- */
- public Processor getOutput() {
- return output;
- }
-
- /**
- * Returns the dead letter that message exchanges will be sent to if the
- * redelivery attempts fail
- */
- public Processor getDeadLetter() {
- return deadLetter;
- }
-
- public RedeliveryPolicy getRedeliveryPolicy() {
- return redeliveryPolicy;
- }
-
- public Logger getLogger() {
- return logger;
- }
-
- // Implementation methods
- // -------------------------------------------------------------------------
-
- private void prepareExchangeForRedelivery(Exchange exchange) {
- // okay we will give it another go so clear the exception so we can try again
- if (exchange.getException() != null) {
- exchange.setException(null);
- }
-
- // clear rollback flags
- exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
-
- // reset cached streams so they can be read again
- MessageHelper.resetStreamCache(exchange.getIn());
- }
-
- private void handleException(Exchange exchange, RedeliveryData data) {
- Throwable e = exchange.getException();
-
- // store the original caused exception in a property, so we can restore it later
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
-
- // find the error handler to use (if any)
- OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
- if (exceptionPolicy != null) {
- data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
- data.handledPredicate = exceptionPolicy.getHandledPolicy();
- data.retryUntilPredicate = exceptionPolicy.getRetryUntilPolicy();
- data.useOriginalInBody = exceptionPolicy.getUseOriginalBodyPolicy();
-
- // route specific failure handler?
- Processor processor = exceptionPolicy.getErrorHandler();
- if (processor != null) {
- data.deadLetterQueue = processor;
- }
- // route specific on redelivey?
- processor = exceptionPolicy.getOnRedelivery();
- if (processor != null) {
- data.onRedeliveryProcessor = processor;
- }
- }
-
- String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
- + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
- logFailedDelivery(true, exchange, msg, data, e);
-
- data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
- }
-
- /**
- * Gives an optional configure redelivery processor a chance to process before the Exchange
- * will be redelivered. This can be used to alter the Exchange.
- */
- private void deliverToRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
- if (data.onRedeliveryProcessor == null) {
- return;
- }
-
- if (log.isTraceEnabled()) {
- log.trace("RedeliveryProcessor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange + " before its redelivered");
- }
-
- try {
- data.onRedeliveryProcessor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
- log.trace("Redelivery processor done");
- }
-
- /**
- * All redelivery attempts failed so move the exchange to the dead letter queue
- */
- private void deliverToDeadLetterQueue(final Exchange exchange, final RedeliveryData data) {
- if (data.deadLetterQueue == null) {
- return;
- }
-
- // we did not success with the redelivery so now we let the failure processor handle it
- ExchangeHelper.setFailureHandled(exchange);
- // must decrement the redelivery counter as we didn't process the redelivery but is
- // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
- decrementRedeliveryCounter(exchange);
- // reset cached streams so they can be read again
- MessageHelper.resetStreamCache(exchange.getIn());
-
- // prepare original IN body if it should be moved instead of current body
- if (data.useOriginalInBody) {
- if (log.isTraceEnabled()) {
- log.trace("Using the original IN body in the DedLetterQueue instead of the current IN body");
- }
-
- Object original = exchange.getUnitOfWork().getOriginalInBody();
- exchange.getIn().setBody(original);
- }
-
- if (log.isTraceEnabled()) {
- log.trace("DeadLetterQueue " + data.deadLetterQueue + " is processing Exchange: " + exchange);
- }
- try {
- data.deadLetterQueue.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
- log.trace("DedLetterQueue processor done");
-
- prepareExchangeAfterMovedToDeadLetterQueue(exchange, data.handledPredicate);
-
- String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
- + ". Moved to the dead letter queue: " + data.deadLetterQueue;
- logFailedDelivery(false, exchange, msg, data, null);
- }
-
- private void prepareExchangeAfterMovedToDeadLetterQueue(Exchange exchange, Predicate handledPredicate) {
- if (handledPredicate == null || !handledPredicate.matches(exchange)) {
- if (log.isDebugEnabled()) {
- log.debug("This exchange is not handled so its marked as failed: " + exchange);
- }
- // exception not handled, put exception back in the exchange
- exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.FALSE);
- exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
- } else {
- if (log.isDebugEnabled()) {
- log.debug("This exchange is handled so its marked as not failed: " + exchange);
- }
- exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.TRUE);
- }
- }
-
- private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) {
- LoggingLevel newLogLevel;
- if (shouldRedeliver) {
- newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
- } else {
- newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
- }
- if (exchange.isRollbackOnly()) {
- String msg = "Rollback exchange";
- if (exchange.getException() != null) {
- msg = msg + " due: " + exchange.getException().getMessage();
- }
- if (newLogLevel == LoggingLevel.ERROR || newLogLevel == LoggingLevel.FATAL) {
- // log intented rollback on maximum WARN level (no ERROR or FATAL)
- logger.log(msg, LoggingLevel.WARN);
- } else {
- // otherwise use the desired logging level
- logger.log(msg, newLogLevel);
- }
- } else if (data.currentRedeliveryPolicy.isLogStackTrace() && e != null) {
- logger.log(message, e, newLogLevel);
- } else {
- logger.log(message, newLogLevel);
- }
- }
-
- private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
- return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryUntilPredicate);
- }
-
- /**
- * Increments the redelivery counter and adds the redelivered flag if the
- * message has been redelivered
- */
- private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
- Message in = exchange.getIn();
- Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
- int next = 1;
- if (counter != null) {
- next = counter + 1;
- }
- in.setHeader(Exchange.REDELIVERY_COUNTER, next);
- in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
- return next;
- }
-
- /**
- * Prepares the redelivery counter and boolean flag for the failure handle processor
- */
- private void decrementRedeliveryCounter(Exchange exchange) {
- Message in = exchange.getIn();
- Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
- if (counter != null) {
- int prev = counter - 1;
- in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
- // set boolean flag according to counter
- in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
- } else {
- // not redelivered
- in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
- in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
- }
- }
-
- @Override
- protected void doStart() throws Exception {
- ServiceHelper.startServices(output, deadLetter);
- }
-
- @Override
- protected void doStop() throws Exception {
- ServiceHelper.stopServices(deadLetter, output);
- }
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java Mon Jun 15 04:55:49 2009
@@ -16,25 +16,30 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.MessageHelper;
-import org.apache.camel.util.ServiceHelper;
/**
* Default error handler
*
* @version $Revision$
*/
-public class DefaultErrorHandler extends ErrorHandlerSupport implements Processor {
- private Processor output;
+public class DefaultErrorHandler extends RedeliveryErrorHandler {
- public DefaultErrorHandler(Processor output, ExceptionPolicyStrategy exceptionPolicyStrategy) {
- this.output = output;
+ /**
+ * Creates the dead letter channel.
+ *
+ * @param output outer processor that should use this default error handler
+ * @param logger logger to use for logging failures and redelivery attempts
+ * @param redeliveryProcessor an optional processor to run before redelivery attempt
+ * @param redeliveryPolicy policy for redelivery
+ * @param handledPolicy policy for handling failed exception that are moved to the dead letter queue
+ * @param exceptionPolicyStrategy strategy for onException handling
+ */
+ public DefaultErrorHandler(Processor output, Logger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
+ Predicate handledPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy) {
+ super(output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, null, null, false);
setExceptionPolicy(exceptionPolicyStrategy);
}
@@ -43,97 +48,4 @@
return "DefaultErrorHandler[" + output + "]";
}
- public boolean supportTransacted() {
- return false;
- }
-
- public void process(Exchange exchange) throws Exception {
- try {
- output.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
-
- // do not handle transacted exchanges as this error handler does not support it
- boolean handle = true;
- if (exchange.isTransacted() && !supportTransacted()) {
- handle = false;
- if (log.isDebugEnabled()) {
- log.debug("This error handler does not support transacted exchanges."
- + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
- }
- }
-
- if (handle && exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
- handleException(exchange);
- }
- }
-
- private void handleException(Exchange exchange) throws Exception {
- Exception e = exchange.getException();
-
- // store the original caused exception in a property, so we can restore it later
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
-
- // find the error handler to use (if any)
- OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
- if (exceptionPolicy != null) {
- Predicate handledPredicate = exceptionPolicy.getHandledPolicy();
-
- Processor processor = exceptionPolicy.getErrorHandler();
- prepareExchangeBeforeOnException(exchange);
- if (processor != null) {
- deliverToFaultProcessor(exchange, processor);
- }
- prepareExchangeAfterOnException(exchange, handledPredicate);
- }
- }
-
- private void prepareExchangeBeforeOnException(Exchange exchange) {
- // okay lower the exception as we are handling it by onException
- if (exchange.getException() != null) {
- exchange.setException(null);
- }
-
- // clear rollback flags
- exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
-
- // reset cached streams so they can be read again
- MessageHelper.resetStreamCache(exchange.getIn());
- }
-
- private void deliverToFaultProcessor(final Exchange exchange, final Processor failureProcessor) throws Exception {
- failureProcessor.process(exchange);
- }
-
- private void prepareExchangeAfterOnException(Exchange exchange, Predicate handledPredicate) {
- if (handledPredicate == null || !handledPredicate.matches(exchange)) {
- if (log.isDebugEnabled()) {
- log.debug("This exchange is not handled so its marked as failed: " + exchange);
- }
- // exception not handled, put exception back in the exchange
- exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
- } else {
- if (log.isDebugEnabled()) {
- log.debug("This exchange is handled so its marked as not failed: " + exchange);
- }
- exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.TRUE);
- }
- }
-
- /**
- * Returns the output processor
- */
- public Processor getOutput() {
- return output;
- }
-
- protected void doStart() throws Exception {
- ServiceHelper.startServices(output);
- }
-
- protected void doStop() throws Exception {
- ServiceHelper.stopServices(output);
- }
-
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=784652&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Jun 15 04:55:49 2009
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.model.OnExceptionDefinition;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * Base redeliverable error handler that also suppors a final dead letter queue in case
+ * all redelivery attempts fail.
+ * <p/>
+ * This implementation should contain all the error handling logic and the sub classes
+ * should only configure it according to what they support.
+ *
+ * @version $Revision$
+ */
+public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements Processor {
+
+ // TODO: support onException being able to use other onException to route they exceptions
+ // (hard one to get working, has not been supported before)
+
+ // TODO: the while loop method should be refactored a bit so its more higher level so the
+ // code is easier to read and understand
+
+ // TODO: add support for onRedeliver(SocketException.class) to allow running som custom
+ // route when this given exception is being redelivered (create ticket and add in 2.1)
+
+ protected final Processor deadLetter;
+ protected final String deadLetterUri;
+ protected final Processor output;
+ protected final Processor redeliveryProcessor;
+ protected final RedeliveryPolicy redeliveryPolicy;
+ protected final Predicate handledPolicy;
+ protected final Logger logger;
+ protected final boolean useOriginalBodyPolicy;
+
+ protected class RedeliveryData {
+ int redeliveryCounter;
+ long redeliveryDelay;
+ Predicate retryUntilPredicate;
+
+ // default behavior which can be overloaded on a per exception basis
+ RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
+ Processor failureProcessor = deadLetter;
+ Processor onRedeliveryProcessor = redeliveryProcessor;
+ Predicate handledPredicate = handledPolicy;
+ boolean useOriginalInBody = useOriginalBodyPolicy;
+ }
+
+ public RedeliveryErrorHandler(Processor output, Logger logger, Processor redeliveryProcessor,
+ RedeliveryPolicy redeliveryPolicy, Predicate handledPolicy, Processor deadLetter,
+ String deadLetterUri, boolean useOriginalBodyPolicy) {
+ this.redeliveryProcessor = redeliveryProcessor;
+ this.deadLetter = deadLetter;
+ this.output = output;
+ this.redeliveryPolicy = redeliveryPolicy;
+ this.logger = logger;
+ this.deadLetterUri = deadLetterUri;
+ this.handledPolicy = handledPolicy;
+ this.useOriginalBodyPolicy = useOriginalBodyPolicy;
+ }
+
+ public boolean supportTransacted() {
+ return false;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ processErrorHandler(exchange, new RedeliveryData());
+ }
+
+ /**
+ * Processes the exchange decorated with this dead letter channel.
+ */
+ protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) {
+
+ while (true) {
+ // we can't keep retrying if the route is being shutdown.
+ if (!isRunAllowed()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Rejected execution as we are not started for exchange: " + exchange);
+ }
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ return;
+ }
+ }
+
+ // do not handle transacted exchanges that failed as this error handler does not support it
+ if (exchange.isTransacted() && !supportTransacted() && exchange.getException() != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("This error handler does not support transacted exchanges."
+ + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
+ }
+ return;
+ }
+
+ // did previous processing caused an exception?
+ if (exchange.getException() != null) {
+ handleException(exchange, data);
+ }
+
+ // compute if we should redeliver or not
+ boolean shouldRedeliver = shouldRedeliver(exchange, data);
+ if (!shouldRedeliver) {
+ // TODO: divde into onException and deadLetterQueue
+ // no then move it to the dead letter queue
+ deliverToFailureProcessor(exchange, data);
+ // prepare the exchange for failure
+ prepareExchangeAfterFailure(exchange, data);
+ // we could not process the exchange succesfully so break
+ return;
+ }
+
+ // if we are redelivering then sleep before trying again
+ if (data.redeliveryCounter > 0) {
+ prepareExchangeForRedelivery(exchange);
+
+ // wait until we should redeliver
+ try {
+ data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter);
+ } catch (InterruptedException e) {
+ log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
+ // continue from top
+ continue;
+ }
+
+ // letting onRedeliver be executed
+ deliverToRedeliveryProcessor(exchange, data);
+ }
+
+ // process the exchange
+ try {
+ output.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ boolean done = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange);
+ if (done) {
+ return;
+ }
+ // error occurred so loop back around.....
+ }
+
+ }
+
+ /**
+ * Returns the output processor
+ */
+ public Processor getOutput() {
+ return output;
+ }
+
+ /**
+ * Returns the dead letter that message exchanges will be sent to if the
+ * redelivery attempts fail
+ */
+ public Processor getDeadLetter() {
+ return deadLetter;
+ }
+
+ public RedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ public Logger getLogger() {
+ return logger;
+ }
+
+ protected void prepareExchangeForRedelivery(Exchange exchange) {
+ // okay we will give it another go so clear the exception so we can try again
+ if (exchange.getException() != null) {
+ exchange.setException(null);
+ }
+
+ // clear rollback flags
+ exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
+
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getIn());
+ }
+
+ protected void handleException(Exchange exchange, RedeliveryData data) {
+ Throwable e = exchange.getException();
+
+ // store the original caused exception in a property, so we can restore it later
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+
+ // find the error handler to use (if any)
+ OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
+ if (exceptionPolicy != null) {
+ data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
+ data.handledPredicate = exceptionPolicy.getHandledPolicy();
+ data.retryUntilPredicate = exceptionPolicy.getRetryUntilPolicy();
+ data.useOriginalInBody = exceptionPolicy.getUseOriginalBodyPolicy();
+
+ // route specific failure handler?
+ Processor processor = exceptionPolicy.getErrorHandler();
+ if (processor != null) {
+ data.failureProcessor = processor;
+ }
+ // route specific on redelivey?
+ processor = exceptionPolicy.getOnRedelivery();
+ if (processor != null) {
+ data.onRedeliveryProcessor = processor;
+ }
+ }
+
+ String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
+ + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
+ logFailedDelivery(true, exchange, msg, data, e);
+
+ data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+ }
+
+ /**
+ * Gives an optional configure redelivery processor a chance to process before the Exchange
+ * will be redelivered. This can be used to alter the Exchange.
+ */
+ protected void deliverToRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
+ if (data.onRedeliveryProcessor == null) {
+ return;
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Redelivery processor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange
+ + " before its redelivered");
+ }
+
+ try {
+ data.onRedeliveryProcessor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ log.trace("Redelivery processor done");
+ }
+
+ /**
+ * All redelivery attempts failed so move the exchange to the dead letter queue
+ */
+ protected void deliverToFailureProcessor(final Exchange exchange, final RedeliveryData data) {
+ // we did not success with the redelivery so now we let the failure processor handle it
+ ExchangeHelper.setFailureHandled(exchange);
+ // must decrement the redelivery counter as we didn't process the redelivery but is
+ // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
+ decrementRedeliveryCounter(exchange);
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getIn());
+
+ if (data.failureProcessor != null) {
+ // prepare original IN body if it should be moved instead of current body
+ if (data.useOriginalInBody) {
+ if (log.isTraceEnabled()) {
+ log.trace("Using the original IN body in the DedLetterQueue instead of the current IN body");
+ }
+
+ Object original = exchange.getUnitOfWork().getOriginalInBody();
+ exchange.getIn().setBody(original);
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Failure processor " + data.failureProcessor + " is processing Exchange: " + exchange);
+ }
+ try {
+ data.failureProcessor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ log.trace("Failure processor done");
+
+ String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
+ + ". Processed by failure processor: " + data.failureProcessor;
+ logFailedDelivery(false, exchange, msg, data, null);
+ }
+ }
+
+ protected void prepareExchangeAfterFailure(Exchange exchange, final RedeliveryData data) {
+ Predicate handledPredicate = data.handledPredicate;
+
+ if (handledPredicate == null || !handledPredicate.matches(exchange)) {
+ if (log.isDebugEnabled()) {
+ log.debug("This exchange is not handled so its marked as failed: " + exchange);
+ }
+ // exception not handled, put exception back in the exchange
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.FALSE);
+ exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("This exchange is handled so its marked as not failed: " + exchange);
+ }
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.TRUE);
+ }
+ }
+
+ private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) {
+ if (logger == null) {
+ return;
+ }
+
+ LoggingLevel newLogLevel;
+ if (shouldRedeliver) {
+ newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
+ } else {
+ newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+ }
+ if (exchange.isRollbackOnly()) {
+ String msg = "Rollback exchange";
+ if (exchange.getException() != null) {
+ msg = msg + " due: " + exchange.getException().getMessage();
+ }
+ if (newLogLevel == LoggingLevel.ERROR || newLogLevel == LoggingLevel.FATAL) {
+ // log intented rollback on maximum WARN level (no ERROR or FATAL)
+ logger.log(msg, LoggingLevel.WARN);
+ } else {
+ // otherwise use the desired logging level
+ logger.log(msg, newLogLevel);
+ }
+ } else if (data.currentRedeliveryPolicy.isLogStackTrace() && e != null) {
+ logger.log(message, e, newLogLevel);
+ } else {
+ logger.log(message, newLogLevel);
+ }
+ }
+
+ private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
+ return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryUntilPredicate);
+ }
+
+ /**
+ * Increments the redelivery counter and adds the redelivered flag if the
+ * message has been redelivered
+ */
+ private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
+ Message in = exchange.getIn();
+ Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
+ int next = 1;
+ if (counter != null) {
+ next = counter + 1;
+ }
+ in.setHeader(Exchange.REDELIVERY_COUNTER, next);
+ in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+ return next;
+ }
+
+ /**
+ * Prepares the redelivery counter and boolean flag for the failure handle processor
+ */
+ private void decrementRedeliveryCounter(Exchange exchange) {
+ Message in = exchange.getIn();
+ Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
+ if (counter != null) {
+ int prev = counter - 1;
+ in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
+ // set boolean flag according to counter
+ in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
+ } else {
+ // not redelivered
+ in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
+ in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(output, deadLetter);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(deadLetter, output);
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java Mon Jun 15 04:55:49 2009
@@ -17,9 +17,9 @@
package org.apache.camel.processor.idempotent;
import org.apache.camel.Exchange;
-import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java Mon Jun 15 04:55:49 2009
@@ -37,7 +37,7 @@
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.maximumRedeliveries(1);
redeliveryPolicy.setUseExponentialBackOff(true);
- DeadLetterChannelBuilder deadLetterChannelBuilder = new DeadLetterChannelBuilder();
+ DeadLetterChannelBuilder deadLetterChannelBuilder = new DeadLetterChannelBuilder("mock:error");
deadLetterChannelBuilder.setRedeliveryPolicy(redeliveryPolicy);
context.setErrorHandlerBuilder(deadLetterChannelBuilder);
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java Mon Jun 15 04:55:49 2009
@@ -168,7 +168,7 @@
.to(ERROR_QUEUE);
onException(IOException.class)
- .redeliveryDelay(1000L)
+ .redeliveryDelay(100L)
.maximumRedeliveries(3)
.maximumRedeliveryDelay(30 * 1000L)
.backOffMultiplier(1.0)
@@ -177,13 +177,13 @@
.to(ERROR_QUEUE);
onException(Exception.class)
- .redeliveryDelay(1000L)
+ .redeliveryDelay(100L)
.maximumRedeliveries(2)
.setHeader(MESSAGE_INFO, constant("Damm just exception"))
.to(ERROR_QUEUE);
onException(MyBaseBusinessException.class)
- .redeliveryDelay(1000L)
+ .redeliveryDelay(100L)
.maximumRedeliveries(3)
.setHeader(MESSAGE_INFO, constant("Damm my business is not going to well"))
.to(BUSINESS_ERROR_QUEUE);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithRetryLoggingLevelSetTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithRetryLoggingLevelSetTest.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithRetryLoggingLevelSetTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithRetryLoggingLevelSetTest.java Mon Jun 15 04:55:49 2009
@@ -102,7 +102,7 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- errorHandler(deadLetterChannel().log(getCustomLog()));
+ errorHandler(deadLetterChannel("mock:error").log(getCustomLog()));
onException(NullPointerException.class)
.maximumRedeliveries(0)
@@ -111,7 +111,7 @@
// START SNIPPET: exceptionBuilder1
onException(IOException.class)
- .redeliveryDelay(1000L)
+ .redeliveryDelay(100L)
.maximumRedeliveries(3)
.maximumRedeliveryDelay(10000L)
.backOffMultiplier(1.0)
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInfoSelectMethodTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInfoSelectMethodTest.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInfoSelectMethodTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInfoSelectMethodTest.java Mon Jun 15 04:55:49 2009
@@ -58,7 +58,7 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- errorHandler(deadLetterChannel().logStackTrace(false).maximumRedeliveries(3));
+ errorHandler(deadLetterChannel("mock:error").logStackTrace(false).maximumRedeliveries(3));
onException(Exception.class).handled(true).beanRef("foo", "handleFailure").to("mock:result");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/DefaultParameterMappingStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/DefaultParameterMappingStrategyTest.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/DefaultParameterMappingStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/DefaultParameterMappingStrategyTest.java Mon Jun 15 04:55:49 2009
@@ -70,7 +70,7 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- errorHandler(deadLetterChannel().logStackTrace(false).disableRedelivery());
+ errorHandler(deadLetterChannel("mock:error").logStackTrace(false).disableRedelivery());
onException(Exception.class).handled(true).beanRef("foo", "withException").to("mock:result");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelUseOriginalInBodyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelUseOriginalInBodyTest.java?rev=784652&r1=784651&r2=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelUseOriginalInBodyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelUseOriginalInBodyTest.java Mon Jun 15 04:55:49 2009
@@ -19,7 +19,7 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.builder.DeadLetterChannelBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -54,11 +54,11 @@
@Override
public void configure() throws Exception {
// will use original
- DeadLetterChannelBuilder a = deadLetterChannel("mock:a")
+ ErrorHandlerBuilder a = deadLetterChannel("mock:a")
.maximumRedeliveries(2).redeliverDelay(0).logStackTrace(false).useOriginalBody().handled(true);
// will NOT use original
- DeadLetterChannelBuilder b = deadLetterChannel("mock:b")
+ ErrorHandlerBuilder b = deadLetterChannel("mock:b")
.maximumRedeliveries(2).redeliverDelay(0).logStackTrace(false).handled(true);
from("direct:a")
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerRedeliveryTest.java (from r784508, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerRedeliveryTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerRedeliveryTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java&r1=784508&r2=784652&rev=784652&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerRedeliveryTest.java Mon Jun 15 04:55:49 2009
@@ -21,72 +21,46 @@
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
/**
* Unit test to verify that redelivery counters is working as expected.
*/
-public class DeadLetterChannelRedeliveryTest extends ContextTestSupport {
+public class DefaultErrorHandlerRedeliveryTest extends ContextTestSupport {
private static int counter;
public void testRedeliveryTest() throws Exception {
counter = 0;
- // We expect the exchange here after 1 delivery and 2 re-deliveries
- MockEndpoint mock = getMockEndpoint("mock:error");
- mock.expectedMessageCount(1);
- mock.message(0).header("CamelRedelivered").isEqualTo(Boolean.TRUE);
- mock.message(0).header("CamelRedeliveryCounter").isEqualTo(2);
-
try {
template.sendBody("direct:start", "Hello World");
} catch (RuntimeCamelException e) {
// expected
}
- assertMockEndpointsSatisfied();
-
assertEquals(3, counter); // One call + 2 re-deliveries
}
public void testNoRedeliveriesTest() throws Exception {
counter = 0;
- // We expect the exchange here after 1 delivery
- MockEndpoint mock = getMockEndpoint("mock:no");
- mock.expectedMessageCount(1);
- mock.message(0).header("CamelRedelivered").isEqualTo(Boolean.FALSE);
- mock.message(0).header("CamelRedeliveryCounter").isEqualTo(0);
-
try {
template.sendBody("direct:no", "Hello World");
} catch (RuntimeCamelException e) {
// expected
}
- assertMockEndpointsSatisfied();
-
assertEquals(1, counter); // One call
}
public void testOneRedeliveryTest() throws Exception {
counter = 0;
-
- // We expect the exchange here after 1 delivery and 1 re delivery
- MockEndpoint mock = getMockEndpoint("mock:one");
- mock.expectedMessageCount(1);
- mock.message(0).header("CamelRedelivered").isEqualTo(Boolean.TRUE);
- mock.message(0).header("CamelRedeliveryCounter").isEqualTo(1);
-
try {
template.sendBody("direct:one", "Hello World");
} catch (RuntimeCamelException e) {
// expected
}
- assertMockEndpointsSatisfied();
-
assertEquals(2, counter); // One call + 1 re-delivery
}
@@ -95,7 +69,7 @@
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:start")
- .errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2))
+ .errorHandler(defaultErrorHandler().maximumRedeliveries(2))
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
counter++;
@@ -104,7 +78,6 @@
});
from("direct:no")
- .errorHandler(deadLetterChannel("mock:no").maximumRedeliveries(0))
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
counter++;
@@ -113,7 +86,7 @@
});
from("direct:one")
- .errorHandler(deadLetterChannel("mock:one").maximumRedeliveries(1))
+ .errorHandler(defaultErrorHandler().maximumRedeliveries(1))
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
counter++;
@@ -124,4 +97,4 @@
};
}
-}
+}
\ No newline at end of file