You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/02/14 12:53:55 UTC
[camel] 10/23: CAMEL-15105: rework redelivery handling
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit ceaffab948b9ab441b984a1fabaaf3731ba0996e
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Nov 22 18:28:49 2022 +0100
CAMEL-15105: rework redelivery handling
---
.../microprofile/faulttolerance/FaultToleranceProcessor.java | 2 +-
.../org/apache/camel/component/resilience4j/ResilienceProcessor.java | 3 +--
.../java/org/apache/camel/impl/engine/CamelInternalProcessor.java | 4 ++--
.../src/main/java/org/apache/camel/processor/CatchProcessor.java | 4 ++--
.../src/main/java/org/apache/camel/processor/MulticastProcessor.java | 2 +-
.../main/java/org/apache/camel/processor/OnCompletionProcessor.java | 2 +-
.../src/main/java/org/apache/camel/processor/PollEnricher.java | 2 +-
.../org/apache/camel/processor/aggregate/AggregateProcessor.java | 5 ++---
.../java/org/apache/camel/processor/errorhandler/NoErrorHandler.java | 3 +--
.../apache/camel/support/BridgeExceptionHandlerToErrorHandler.java | 3 +--
10 files changed, 13 insertions(+), 17 deletions(-)
diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 6c148e068ea..30b8cc538d7 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -548,7 +548,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
exchange.setException(null);
// and we should not be regarded as exhausted as we are in a try ..
// catch block
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+ exchange.getExchangeExtension().setRedeliveryExhausted(false);
// run the fallback processor
try {
LOG.debug("Running fallback: {} with exchange: {}", fallbackProcessor, exchange);
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 83234a6b165..97a8aa9a17e 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -41,7 +41,6 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
@@ -683,7 +682,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
exchange.setException(null);
// and we should not be regarded as exhausted as we are in a try ..
// catch block
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+ exchange.getExchangeExtension().setRedeliveryExhausted(false);
// run the fallback processor
try {
LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index ff2e6c51067..05e7b3d1d17 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -961,7 +961,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
exchange.setException(tce);
// because this is stream caching error then we cannot use redelivery as the message body is corrupt
// so mark as redelivery exhausted
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
+ exchange.getExchangeExtension().setRedeliveryExhausted(true);
}
// check if we somewhere failed due to a stream caching exception
Throwable cause = exchange.getException();
@@ -987,7 +987,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
exchange.setException(tce);
// because this is stream caching error then we cannot use redelivery as the message body is corrupt
// so mark as redelivery exhausted
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
+ exchange.getExchangeExtension().setRedeliveryExhausted(true);
}
}
return null;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
index b13fc20f890..64be8c6814a 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -115,7 +115,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable,
exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, e);
exchange.setException(null);
// and we should not be regarded as exhausted as we are in a try .. catch block
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+ exchange.getExchangeExtension().setRedeliveryExhausted(false);
if (LOG.isDebugEnabled()) {
LOG.debug("The exception is handled for the exception: {} caused by: {}",
@@ -131,7 +131,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable,
EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, false, null);
// always clear redelivery exhausted in a catch clause
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+ exchange.getExchangeExtension().setRedeliveryExhausted(false);
if (rollbackOnly || rollbackOnlyLast || stop) {
exchange.setRouteStop(stop);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index af740f9b1c2..d4602468844 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -827,7 +827,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
// multicast uses error handling on its output processors and they have tried to redeliver
// so we shall signal back to the other error handlers that we are exhausted and they should not
// also try to redeliver as we would then do that twice
- original.adapt(ExtendedExchange.class).setRedeliveryExhausted(exhaust);
+ original.getExchangeExtension().setRedeliveryExhausted(exhaust);
}
reactiveExecutor.schedule(callback);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 58c708c29cc..fc7da959e16 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -177,7 +177,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
boolean rollbackOnlyLast = ee.isRollbackOnlyLast();
ee.setRollbackOnlyLast(false);
// and we should not be regarded as exhausted as we are in a onCompletion block
- boolean exhausted = ee.adapt(ExtendedExchange.class).isRedeliveryExhausted();
+ boolean exhausted = ee.getExchangeExtension().isRedeliveryExhausted();
ee.setRedeliveryExhausted(false);
Exception cause = ee.getException();
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 522a53f6944..4a1bd3605c3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -322,7 +322,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
// restore caused exception
exchange.setException(cause);
// remove the exhausted marker as we want to be able to perform redeliveries with the error handler
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+ exchange.getExchangeExtension().setRedeliveryExhausted(false);
// preserve the redelivery stats
if (redeliveried != null) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index b7a4e9a7a6d..b96e38ae395 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -42,7 +42,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Navigate;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Predicate;
@@ -1435,8 +1434,8 @@ public class AggregateProcessor extends AsyncProcessorSupport
// set redelivery counter
exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
// and prepare for sending to DLC
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
- exchange.adapt(ExtendedExchange.class).setRollbackOnly(false);
+ exchange.getExchangeExtension().setRedeliveryExhausted(false);
+ exchange.setRollbackOnly(false);
deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
} catch (Throwable e) {
exchange.setException(e);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java
index 3a74e89c82e..59d00a72fd8 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java
@@ -21,7 +21,6 @@ import java.util.concurrent.CompletableFuture;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ErrorHandler;
import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
@@ -47,7 +46,7 @@ public class NoErrorHandler extends ErrorHandlerSupport implements AsyncProcesso
return output.process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+ exchange.getExchangeExtension().setRedeliveryExhausted(false);
callback.done(doneSync);
}
});
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
index 88d6e806453..f56ddfbbe9b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
@@ -18,7 +18,6 @@ package org.apache.camel.support;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.UnitOfWork;
@@ -71,7 +70,7 @@ public class BridgeExceptionHandlerToErrorHandler implements ExceptionHandler {
// mark as bridged
exchange.setProperty(ExchangePropertyKey.ERRORHANDLER_BRIDGE, true);
// and mark as redelivery exhausted as we cannot do redeliveries
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
+ exchange.getExchangeExtension().setRedeliveryExhausted(true);
// wrap in UoW
UnitOfWork uow = null;