You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/02/14 12:53:55 UTC

[camel] 10/23: CAMEL-15105: rework redelivery handling

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ceaffab948b9ab441b984a1fabaaf3731ba0996e
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Nov 22 18:28:49 2022 +0100

    CAMEL-15105: rework redelivery handling
---
 .../microprofile/faulttolerance/FaultToleranceProcessor.java         | 2 +-
 .../org/apache/camel/component/resilience4j/ResilienceProcessor.java | 3 +--
 .../java/org/apache/camel/impl/engine/CamelInternalProcessor.java    | 4 ++--
 .../src/main/java/org/apache/camel/processor/CatchProcessor.java     | 4 ++--
 .../src/main/java/org/apache/camel/processor/MulticastProcessor.java | 2 +-
 .../main/java/org/apache/camel/processor/OnCompletionProcessor.java  | 2 +-
 .../src/main/java/org/apache/camel/processor/PollEnricher.java       | 2 +-
 .../org/apache/camel/processor/aggregate/AggregateProcessor.java     | 5 ++---
 .../java/org/apache/camel/processor/errorhandler/NoErrorHandler.java | 3 +--
 .../apache/camel/support/BridgeExceptionHandlerToErrorHandler.java   | 3 +--
 10 files changed, 13 insertions(+), 17 deletions(-)

diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 6c148e068ea..30b8cc538d7 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -548,7 +548,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
             exchange.setException(null);
             // and we should not be regarded as exhausted as we are in a try ..
             // catch block
-            exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+            exchange.getExchangeExtension().setRedeliveryExhausted(false);
             // run the fallback processor
             try {
                 LOG.debug("Running fallback: {} with exchange: {}", fallbackProcessor, exchange);
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 83234a6b165..97a8aa9a17e 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -41,7 +41,6 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
@@ -683,7 +682,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             exchange.setException(null);
             // and we should not be regarded as exhausted as we are in a try ..
             // catch block
-            exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+            exchange.getExchangeExtension().setRedeliveryExhausted(false);
             // run the fallback processor
             try {
                 LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index ff2e6c51067..05e7b3d1d17 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -961,7 +961,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
                 exchange.setException(tce);
                 // because this is stream caching error then we cannot use redelivery as the message body is corrupt
                 // so mark as redelivery exhausted
-                exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
+                exchange.getExchangeExtension().setRedeliveryExhausted(true);
             }
             // check if we somewhere failed due to a stream caching exception
             Throwable cause = exchange.getException();
@@ -987,7 +987,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
                     exchange.setException(tce);
                     // because this is stream caching error then we cannot use redelivery as the message body is corrupt
                     // so mark as redelivery exhausted
-                    exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
+                    exchange.getExchangeExtension().setRedeliveryExhausted(true);
                 }
             }
             return null;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
index b13fc20f890..64be8c6814a 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -115,7 +115,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable,
         exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, e);
         exchange.setException(null);
         // and we should not be regarded as exhausted as we are in a try .. catch block
-        exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+        exchange.getExchangeExtension().setRedeliveryExhausted(false);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("The exception is handled for the exception: {} caused by: {}",
@@ -131,7 +131,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable,
                 EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, false, null);
 
                 // always clear redelivery exhausted in a catch clause
-                exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+                exchange.getExchangeExtension().setRedeliveryExhausted(false);
 
                 if (rollbackOnly || rollbackOnlyLast || stop) {
                     exchange.setRouteStop(stop);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index af740f9b1c2..d4602468844 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -827,7 +827,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
             // multicast uses error handling on its output processors and they have tried to redeliver
             // so we shall signal back to the other error handlers that we are exhausted and they should not
             // also try to redeliver as we would then do that twice
-            original.adapt(ExtendedExchange.class).setRedeliveryExhausted(exhaust);
+            original.getExchangeExtension().setRedeliveryExhausted(exhaust);
         }
 
         reactiveExecutor.schedule(callback);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 58c708c29cc..fc7da959e16 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -177,7 +177,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
         boolean rollbackOnlyLast = ee.isRollbackOnlyLast();
         ee.setRollbackOnlyLast(false);
         // and we should not be regarded as exhausted as we are in a onCompletion block
-        boolean exhausted = ee.adapt(ExtendedExchange.class).isRedeliveryExhausted();
+        boolean exhausted = ee.getExchangeExtension().isRedeliveryExhausted();
         ee.setRedeliveryExhausted(false);
 
         Exception cause = ee.getException();
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 522a53f6944..4a1bd3605c3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -322,7 +322,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
                 // restore caused exception
                 exchange.setException(cause);
                 // remove the exhausted marker as we want to be able to perform redeliveries with the error handler
-                exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+                exchange.getExchangeExtension().setRedeliveryExhausted(false);
 
                 // preserve the redelivery stats
                 if (redeliveried != null) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index b7a4e9a7a6d..b96e38ae395 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -42,7 +42,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Predicate;
@@ -1435,8 +1434,8 @@ public class AggregateProcessor extends AsyncProcessorSupport
                                         // set redelivery counter
                                         exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
                                         // and prepare for sending to DLC
-                                        exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
-                                        exchange.adapt(ExtendedExchange.class).setRollbackOnly(false);
+                                        exchange.getExchangeExtension().setRedeliveryExhausted(false);
+                                        exchange.setRollbackOnly(false);
                                         deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
                                     } catch (Throwable e) {
                                         exchange.setException(e);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java
index 3a74e89c82e..59d00a72fd8 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java
@@ -21,7 +21,6 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ErrorHandler;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
@@ -47,7 +46,7 @@ public class NoErrorHandler extends ErrorHandlerSupport implements AsyncProcesso
         return output.process(exchange, new AsyncCallback() {
             @Override
             public void done(boolean doneSync) {
-                exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+                exchange.getExchangeExtension().setRedeliveryExhausted(false);
                 callback.done(doneSync);
             }
         });
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
index 88d6e806453..f56ddfbbe9b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
@@ -18,7 +18,6 @@ package org.apache.camel.support;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.UnitOfWork;
@@ -71,7 +70,7 @@ public class BridgeExceptionHandlerToErrorHandler implements ExceptionHandler {
         // mark as bridged
         exchange.setProperty(ExchangePropertyKey.ERRORHANDLER_BRIDGE, true);
         // and mark as redelivery exhausted as we cannot do redeliveries
-        exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
+        exchange.getExchangeExtension().setRedeliveryExhausted(true);
 
         // wrap in UoW
         UnitOfWork uow = null;