You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/04/06 07:17:47 UTC

[camel] 01/09: CAMEL-16834: error handler in model DSL. WIP

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch errorhandler-in-dsl
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8a2798d74e3582034d34ab6fee73a2348d0f0347
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Apr 5 16:09:07 2022 +0200

    CAMEL-16834: error handler in model DSL. WIP
---
 .../errorhandler/DeadLetterChannelDefinition.java  |  21 +-
 .../DefaultErrorHandlerDefinition.java             | 382 ++++++++++++++++++++-
 2 files changed, 401 insertions(+), 2 deletions(-)

diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DeadLetterChannelDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DeadLetterChannelDefinition.java
index b541bfa511f..75a01a5804e 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DeadLetterChannelDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DeadLetterChannelDefinition.java
@@ -32,7 +32,6 @@ import org.apache.camel.spi.Metadata;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class DeadLetterChannelDefinition extends DefaultErrorHandlerDefinition {
 
-    // TODO: fluent builders
     // TODO: label, java type, ref
 
     @XmlAttribute(required = true)
@@ -89,4 +88,24 @@ public class DeadLetterChannelDefinition extends DefaultErrorHandlerDefinition {
         super.cloneBuilder(other);
     }
 
+    /**
+     * Whether the dead letter channel should handle (and ignore) any new exception that may been thrown during sending
+     * the message to the dead letter endpoint.
+     * <p/>
+     * The default value is <tt>true</tt> which means any such kind of exception is handled and ignored. Set this to
+     * <tt>false</tt> to let the exception be propagated back on the {@link org.apache.camel.Exchange}. This can be used
+     * in situations where you use transactions, and want to use Camel's dead letter channel to deal with exceptions
+     * during routing, but if the dead letter channel itself fails because of a new exception being thrown, then by
+     * setting this to <tt>false</tt> the new exceptions is propagated back and set on the
+     * {@link org.apache.camel.Exchange}, which allows the transaction to detect the exception, and rollback.
+     *
+     * @param  handleNewException <tt>true</tt> to handle (and ignore), <tt>false</tt> to catch and propagated the
+     *                            exception on the {@link org.apache.camel.Exchange}
+     * @return                    the builder
+     */
+    public DefaultErrorHandlerDefinition deadLetterHandleNewException(boolean handleNewException) {
+        setDeadLetterHandleNewException(handleNewException ? "true" : "false");
+        return this;
+    }
+
 }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
index 68e2ce03d5f..a2f3d918ed8 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java
@@ -25,14 +25,18 @@ import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
+import org.apache.camel.Expression;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.model.RedeliveryPolicyDefinition;
+import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
 import org.apache.camel.processor.errorhandler.RedeliveryPolicy;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.Metadata;
+import org.apache.camel.support.ExpressionToPredicateAdapter;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default error handler.
@@ -42,7 +46,6 @@ import org.apache.camel.spi.Metadata;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition implements ErrorHandlerBuilder {
 
-    // TODO: fluent builders
     // TODO: label, java type, ref
 
     @XmlTransient
@@ -401,4 +404,381 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition im
     public void setRedeliveryPolicy(RedeliveryPolicyDefinition redeliveryPolicy) {
         this.redeliveryPolicy = redeliveryPolicy;
     }
+
+    // Builder methods
+    // -------------------------------------------------------------------------
+    public DefaultErrorHandlerDefinition backOffMultiplier(double backOffMultiplier) {
+        getRedeliveryPolicy().backOffMultiplier(backOffMultiplier);
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition collisionAvoidancePercent(double collisionAvoidancePercent) {
+        getRedeliveryPolicy().collisionAvoidancePercent(collisionAvoidancePercent);
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition redeliveryDelay(long delay) {
+        getRedeliveryPolicy().redeliveryDelay(delay);
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition delayPattern(String delayPattern) {
+        getRedeliveryPolicy().delayPattern(delayPattern);
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition maximumRedeliveries(int maximumRedeliveries) {
+        getRedeliveryPolicy().maximumRedeliveries(maximumRedeliveries);
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition disableRedelivery() {
+        getRedeliveryPolicy().maximumRedeliveries(0);
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition maximumRedeliveryDelay(long maximumRedeliveryDelay) {
+        getRedeliveryPolicy().maximumRedeliveryDelay(maximumRedeliveryDelay);
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition useCollisionAvoidance() {
+        getRedeliveryPolicy().useCollisionAvoidance();
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition useExponentialBackOff() {
+        getRedeliveryPolicy().useExponentialBackOff();
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition retriesExhaustedLogLevel(LoggingLevel retriesExhaustedLogLevel) {
+        getRedeliveryPolicy().setRetriesExhaustedLogLevel(retriesExhaustedLogLevel.name());
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition retryAttemptedLogLevel(LoggingLevel retryAttemptedLogLevel) {
+        getRedeliveryPolicy().setRetryAttemptedLogLevel(retryAttemptedLogLevel.name());
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition retryAttemptedLogInterval(int retryAttemptedLogInterval) {
+        getRedeliveryPolicy().setRetryAttemptedLogInterval(String.valueOf(retryAttemptedLogInterval));
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition logStackTrace(boolean logStackTrace) {
+        getRedeliveryPolicy().setLogStackTrace(logStackTrace ? "true" : "false");
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition logRetryStackTrace(boolean logRetryStackTrace) {
+        getRedeliveryPolicy().setLogRetryStackTrace(logRetryStackTrace ? "true" : "false");
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition logHandled(boolean logHandled) {
+        getRedeliveryPolicy().setLogHandled(logHandled ? "true" : "false");
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition logNewException(boolean logNewException) {
+        getRedeliveryPolicy().setLogNewException(logNewException ? "true" : "false");
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition logExhausted(boolean logExhausted) {
+        getRedeliveryPolicy().setLogExhausted(logExhausted ? "true" : "false");
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition logRetryAttempted(boolean logRetryAttempted) {
+        getRedeliveryPolicy().setLogRetryAttempted(logRetryAttempted ? "true" : "false");
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition logExhaustedMessageHistory(boolean logExhaustedMessageHistory) {
+        getRedeliveryPolicy().setLogExhaustedMessageHistory(logExhaustedMessageHistory ? "true" : "false");
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition logExhaustedMessageBody(boolean logExhaustedMessageBody) {
+        getRedeliveryPolicy().setLogExhaustedMessageBody(logExhaustedMessageBody ? "true" : "false");
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition exchangeFormatterRef(String exchangeFormatterRef) {
+        getRedeliveryPolicy().setExchangeFormatterRef(exchangeFormatterRef);
+        return this;
+    }
+
+    /**
+     * Will allow asynchronous delayed redeliveries. The route, in particular the consumer's component, must support the
+     * Asynchronous Routing Engine (e.g. seda)
+     *
+     * @see    RedeliveryPolicy#setAsyncDelayedRedelivery(boolean)
+     * @return the builder
+     */
+    public DefaultErrorHandlerDefinition asyncDelayedRedelivery() {
+        getRedeliveryPolicy().setAsyncDelayedRedelivery("true");
+        return this;
+    }
+
+    /**
+     * Controls whether to allow redelivery while stopping/shutting down a route that uses error handling.
+     *
+     * @param  allowRedeliveryWhileStopping <tt>true</tt> to allow redelivery, <tt>false</tt> to reject redeliveries
+     * @return                              the builder
+     */
+    public DefaultErrorHandlerDefinition allowRedeliveryWhileStopping(boolean allowRedeliveryWhileStopping) {
+        getRedeliveryPolicy().setAllowRedeliveryWhileStopping(allowRedeliveryWhileStopping ? "true" : "false");
+        return this;
+    }
+
+    /**
+     * Sets the thread pool to be used for redelivery.
+     *
+     * @param  executorService the scheduled thread pool to use
+     * @return                 the builder.
+     */
+    public DefaultErrorHandlerDefinition executorService(ScheduledExecutorService executorService) {
+        setExecutorServiceBean(executorService);
+        return this;
+    }
+
+    /**
+     * Sets a reference to a thread pool to be used for redelivery.
+     *
+     * @param  ref reference to a scheduled thread pool
+     * @return     the builder.
+     */
+    public DefaultErrorHandlerDefinition executorServiceRef(String ref) {
+        setExecutorServiceRef(ref);
+        return this;
+    }
+
+    /**
+     * Sets the logger used for caught exceptions
+     *
+     * @param  logger the logger
+     * @return        the builder
+     */
+    public DefaultErrorHandlerDefinition logger(CamelLogger logger) {
+        setLoggerBean(logger);
+        return this;
+    }
+
+    /**
+     * Sets the logging level of exceptions caught
+     *
+     * @param  level the logging level
+     * @return       the builder
+     */
+    public DefaultErrorHandlerDefinition loggingLevel(LoggingLevel level) {
+        setLevel(level);
+        return this;
+    }
+
+    /**
+     * Sets the log used for caught exceptions
+     *
+     * @param  log the logger
+     * @return     the builder
+     */
+    public DefaultErrorHandlerDefinition log(org.slf4j.Logger log) {
+        if (loggerBean == null) {
+            loggerBean = new CamelLogger(LoggerFactory.getLogger(DefaultErrorHandler.class), LoggingLevel.ERROR);
+        }
+        loggerBean.setLog(log);
+        return this;
+    }
+
+    /**
+     * Sets the log used for caught exceptions
+     *
+     * @param  log the log name
+     * @return     the builder
+     */
+    public DefaultErrorHandlerDefinition log(String log) {
+        return log(LoggerFactory.getLogger(log));
+    }
+
+    /**
+     * Sets the log used for caught exceptions
+     *
+     * @param  log the log class
+     * @return     the builder
+     */
+    public DefaultErrorHandlerDefinition log(Class<?> log) {
+        return log(LoggerFactory.getLogger(log));
+    }
+
+    /**
+     * Sets a processor that should be processed <b>before</b> a redelivery attempt.
+     * <p/>
+     * Can be used to change the {@link org.apache.camel.Exchange} <b>before</b> its being redelivered.
+     *
+     * @param  processor the processor
+     * @return           the builder
+     */
+    public DefaultErrorHandlerDefinition onRedelivery(Processor processor) {
+        setOnRedeliveryProcessor(processor);
+        return this;
+    }
+
+    /**
+     * Sets a reference for the processor to use <b>before</b> a redelivery attempt.
+     *
+     * @param  onRedeliveryRef the processor's reference
+     * @return                 the builder
+     * @see                    #onRedelivery(Processor)
+     */
+    public DefaultErrorHandlerDefinition onRedeliveryRef(String onRedeliveryRef) {
+        setOnRedeliveryRef(onRedeliveryRef);
+        return this;
+    }
+
+    /**
+     * Sets the retry while expression.
+     * <p/>
+     * Will continue retrying until expression evaluates to <tt>false</tt>.
+     *
+     * @param  retryWhile expression that determines when to stop retrying
+     * @return            the builder
+     */
+    public DefaultErrorHandlerDefinition retryWhile(Expression retryWhile) {
+        setRetryWhilePredicate(ExpressionToPredicateAdapter.toPredicate(retryWhile));
+        return this;
+    }
+
+    public DefaultErrorHandlerDefinition retryWhileRef(String retryWhileRef) {
+        setRetryWhileRef(retryWhileRef);
+        return this;
+    }
+
+    /**
+     * Will use the original input {@link org.apache.camel.Message} (original body and headers) when an
+     * {@link org.apache.camel.Exchange} is moved to the dead letter queue.
+     * <p/>
+     * <b>Notice:</b> this only applies when all redeliveries attempt have failed and the
+     * {@link org.apache.camel.Exchange} is doomed for failure. <br/>
+     * Instead of using the current inprogress {@link org.apache.camel.Exchange} IN message we use the original IN
+     * message instead. This allows you to store the original input in the dead letter queue instead of the inprogress
+     * snapshot of the IN message. For instance if you route transform the IN body during routing and then failed. With
+     * the original exchange store in the dead letter queue it might be easier to manually re submit the
+     * {@link org.apache.camel.Exchange} again as the IN message is the same as when Camel received it. So you should be
+     * able to send the {@link org.apache.camel.Exchange} to the same input.
+     * <p/>
+     * The difference between useOriginalMessage and useOriginalBody is that the former includes both the original body
+     * and headers, where as the latter only includes the original body. You can use the latter to enrich the message
+     * with custom headers and include the original message body. The former wont let you do this, as its using the
+     * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
+     * <p/>
+     * <b>Important:</b> The original input means the input message that are bounded by the current
+     * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they
+     * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints
+     * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as
+     * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary
+     * for the messages in their sub-route (eg the split message); however these EIPs have an option named
+     * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and
+     * therefore use the parent original message.
+     * <p/>
+     * By default this feature is off.
+     *
+     * @return the builder
+     * @see    #useOriginalBody()
+     */
+    public DefaultErrorHandlerDefinition useOriginalMessage() {
+        setUseOriginalMessage("true");
+        return this;
+    }
+
+    /**
+     * Will use the original input {@link org.apache.camel.Message} body (original body only) when an
+     * {@link org.apache.camel.Exchange} is moved to the dead letter queue.
+     * <p/>
+     * <b>Notice:</b> this only applies when all redeliveries attempt have failed and the
+     * {@link org.apache.camel.Exchange} is doomed for failure. <br/>
+     * Instead of using the current inprogress {@link org.apache.camel.Exchange} IN message we use the original IN
+     * message instead. This allows you to store the original input in the dead letter queue instead of the inprogress
+     * snapshot of the IN message. For instance if you route transform the IN body during routing and then failed. With
+     * the original exchange store in the dead letter queue it might be easier to manually re submit the
+     * {@link org.apache.camel.Exchange} again as the IN message is the same as when Camel received it. So you should be
+     * able to send the {@link org.apache.camel.Exchange} to the same input.
+     * <p/>
+     * The difference between useOriginalMessage and useOriginalBody is that the former includes both the original body
+     * and headers, where as the latter only includes the original body. You can use the latter to enrich the message
+     * with custom headers and include the original message body. The former wont let you do this, as its using the
+     * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody.
+     * <p/>
+     * <b>Important:</b> The original input means the input message that are bounded by the current
+     * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they
+     * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints
+     * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as
+     * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary
+     * for the messages in their sub-route (eg the split message); however these EIPs have an option named
+     * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and
+     * therefore use the parent original message.
+     * <p/>
+     * By default this feature is off.
+     *
+     * @return the builder
+     * @see    #useOriginalMessage()
+     */
+    public DefaultErrorHandlerDefinition useOriginalBody() {
+        setUseOriginalBody("true");
+        return this;
+    }
+
+    /**
+     * Sets a custom {@link org.apache.camel.Processor} to prepare the {@link org.apache.camel.Exchange} before handled
+     * by the failure processor / dead letter channel. This allows for example to enrich the message before sending to a
+     * dead letter queue.
+     *
+     * @param  processor the processor
+     * @return           the builder
+     */
+    public DefaultErrorHandlerDefinition onPrepareFailure(Processor processor) {
+        setOnPrepareFailureProcessor(processor);
+        return this;
+    }
+
+    /**
+     * Sets a reference for the processor to use before handled by the failure processor.
+     *
+     * @param  onPrepareFailureRef the processor's reference
+     * @return                     the builder
+     * @see                        #onPrepareFailure(Processor)
+     */
+    public DefaultErrorHandlerDefinition onPrepareFailureRef(String onPrepareFailureRef) {
+        setOnPrepareFailureRef(onPrepareFailureRef);
+        return this;
+    }
+
+    /**
+     * Sets a custom {@link org.apache.camel.Processor} to process the {@link org.apache.camel.Exchange} just after an
+     * exception was thrown. This allows to execute the processor at the same time the exception was thrown.
+     * <p/>
+     * Important: Any exception thrown from this processor will be ignored.
+     *
+     * @param  processor the processor
+     * @return           the builder
+     */
+    public DefaultErrorHandlerDefinition onExceptionOccurred(Processor processor) {
+        setOnExceptionOccurredProcessor(processor);
+        return this;
+    }
+
+    /**
+     * Sets a reference for the processor to use just after an exception was thrown.
+     *
+     * @param  onExceptionOccurredRef the processor's reference
+     * @return                        the builder
+     * @see                           #onExceptionOccurred(Processor)
+     */
+    public DefaultErrorHandlerDefinition onExceptionOccurredRef(String onExceptionOccurredRef) {
+        setOnExceptionOccurredRef(onExceptionOccurredRef);
+        return this;
+    }
+
 }