You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/02/14 12:54:04 UTC

[camel] 19/23: CAMEL-15105: adapt and cleanup camel-core to the new extension interface

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 70b31903962acc5dd61426938a0f12d4d12f0fd9
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Nov 23 09:27:14 2022 +0100

    CAMEL-15105: adapt and cleanup camel-core to the new extension interface
---
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  5 ++-
 .../camel/impl/engine/DefaultUnitOfWork.java       |  5 ++-
 .../engine/PooledProcessorExchangeFactory.java     |  9 +++--
 .../org/apache/camel/processor/CatchProcessor.java | 14 ++++----
 .../java/org/apache/camel/processor/Enricher.java  |  3 +-
 .../camel/processor/FatalFallbackErrorHandler.java |  3 +-
 .../apache/camel/processor/MulticastProcessor.java |  6 ++--
 .../camel/processor/OnCompletionProcessor.java     | 42 +++++++++++-----------
 .../org/apache/camel/processor/PipelineHelper.java | 10 +++---
 .../errorhandler/RedeliveryErrorHandler.java       |  3 +-
 .../transformer/ProcessorTransformer.java          |  3 +-
 .../org/apache/camel/impl/DefaultExchangeTest.java |  2 +-
 .../java/org/apache/camel/support/EventHelper.java | 19 +++++-----
 .../org/apache/camel/support/MessageHelper.java    |  3 +-
 14 files changed, 55 insertions(+), 72 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index b89f0cbd924..4e725fe8fee 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StaticService;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
@@ -200,7 +199,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
                 }
                 exchange.setException(new RejectedExecutionException(
                         "Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
-                exchange.adapt(ExtendedExchange.class).setInterrupted(true);
+                exchange.getExchangeExtension().setInterrupted(true);
                 entry.getLatch().countDown();
             }
         }
@@ -312,7 +311,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
 
         @Override
         public String getNodeId() {
-            return exchange.adapt(ExtendedExchange.class).getHistoryNodeId();
+            return exchange.getExchangeExtension().getHistoryNodeId();
         }
 
         public CountDownLatch getLatch() {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 3c6cb7af9d6..55385cfcb40 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -28,7 +28,6 @@ import java.util.function.Predicate;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.PooledExchange;
 import org.apache.camel.Processor;
@@ -320,7 +319,7 @@ public class DefaultUnitOfWork implements UnitOfWork {
 
     @Override
     public void beginTransactedBy(Object key) {
-        exchange.adapt(ExtendedExchange.class).setTransacted(true);
+        exchange.getExchangeExtension().setTransacted(true);
         getTransactedBy().add(key);
     }
 
@@ -329,7 +328,7 @@ public class DefaultUnitOfWork implements UnitOfWork {
         getTransactedBy().remove(key);
         // we may still be transacted even if we end this section of transaction
         boolean transacted = isTransacted();
-        exchange.adapt(ExtendedExchange.class).setTransacted(transacted);
+        exchange.getExchangeExtension().setTransacted(transacted);
     }
 
     @Override
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index 48dcc697cbf..634d2db3cd7 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -20,7 +20,6 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.PooledExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ProcessorExchangeFactory;
@@ -59,7 +58,7 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
 
     @Override
     public Exchange createCopy(Exchange exchange) {
-        ExtendedExchange answer = (ExtendedExchange) pool.poll();
+        Exchange answer = pool.poll();
         if (answer == null) {
             if (statisticsEnabled) {
                 statistics.created.increment();
@@ -82,7 +81,7 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
 
     @Override
     public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
-        ExtendedExchange answer = (ExtendedExchange) pool.poll();
+        Exchange answer = pool.poll();
         if (answer == null) {
             if (statisticsEnabled) {
                 statistics.created.increment();
@@ -91,7 +90,7 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
             answer = new DefaultPooledExchange(exchange);
             // if creating a copy via constructor (as above) then the unit of work is also
             // copied over to answer, which we then must set to null as we do not want to share unit of work
-            answer.setUnitOfWork(null);
+            answer.getExchangeExtension().setUnitOfWork(null);
         } else {
             if (statisticsEnabled) {
                 statistics.acquired.increment();
@@ -107,7 +106,7 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
         answer.getIn().setMessageId(null);
         if (handover) {
             // Need to hand over the completion for async invocation
-            answer.handoverCompletions(exchange);
+            answer.getExchangeExtension().handoverCompletions(exchange);
         }
         // set a correlation id so we can track back the original exchange
         answer.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId());
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
index 64be8c6814a..90f7c0bad17 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -21,7 +21,6 @@ import java.util.List;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.RollbackExchangeException;
@@ -98,13 +97,12 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable,
         }
 
         // must remember some properties which we cannot use during doCatch processing
-        ExtendedExchange ee = (ExtendedExchange) exchange;
-        final boolean stop = ee.isRouteStop();
-        ee.setRouteStop(false);
-        final boolean rollbackOnly = ee.isRollbackOnly();
-        ee.setRollbackOnly(false);
-        final boolean rollbackOnlyLast = ee.isRollbackOnlyLast();
-        ee.setRollbackOnlyLast(false);
+        final boolean stop = exchange.isRouteStop();
+        exchange.setRouteStop(false);
+        final boolean rollbackOnly = exchange.isRollbackOnly();
+        exchange.setRollbackOnly(false);
+        final boolean rollbackOnlyLast = exchange.isRollbackOnlyLast();
+        exchange.setRollbackOnlyLast(false);
 
         // store the last to endpoint as the failure endpoint
         if (exchange.getProperty(ExchangePropertyKey.FAILURE_ENDPOINT) == null) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index f4acc5d4a20..8563056ab43 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -26,7 +26,6 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ProcessorExchangeFactory;
@@ -210,7 +209,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         if (isShareUnitOfWork()) {
             target.setProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, source.getUnitOfWork());
             // and then share the unit of work
-            target.adapt(ExtendedExchange.class).setUnitOfWork(source.getUnitOfWork());
+            target.getExchangeExtension().setUnitOfWork(source.getUnitOfWork());
         }
         return target;
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java
index 168797155da..7208ebd943e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java
@@ -22,7 +22,6 @@ import java.util.Deque;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ErrorHandler;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -71,7 +70,7 @@ public class FatalFallbackErrorHandler extends DelegateAsyncProcessor implements
             // mark this exchange as already been error handler handled (just by having this property)
             // the false value mean the caught exception will be kept on the exchange, causing the
             // exception to be propagated back to the caller, and to break out routing
-            exchange.adapt(ExtendedExchange.class).setErrorHandlerHandled(false);
+            exchange.getExchangeExtension().setErrorHandlerHandled(false);
             exchange.setProperty(ExchangePropertyKey.ERRORHANDLER_CIRCUIT_DETECTED, true);
             callback.done(true);
             return true;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index d4602468844..8816113e1ab 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -44,7 +44,6 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -786,8 +785,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         // also we would need to know if any error handler has attempted redelivery and exhausted
         boolean stoppedOnException = false;
         boolean exception = false;
-        ExtendedExchange see = (ExtendedExchange) subExchange;
-        boolean exhaust = forceExhaust || see != null && (see.getException() != null || see.isRedeliveryExhausted());
+        boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || subExchange.getExchangeExtension().isRedeliveryExhausted());
         if (original.getException() != null || subExchange != null && subExchange.getException() != null) {
             // there was an exception and we stopped
             stoppedOnException = isStopOnException();
@@ -911,7 +909,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         for (Processor processor : processors) {
             // copy exchange, and do not share the unit of work
             Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
-            copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());
+            copy.getExchangeExtension().setTransacted(exchange.isTransacted());
             // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData
             // during the transaction.
             if (exchange.isTransacted() && copy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index fc7da959e16..b4b2cf3265c 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -26,7 +26,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Ordered;
 import org.apache.camel.Predicate;
@@ -163,26 +162,25 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
      * @param exchange  the exchange
      */
     protected static void doProcess(Processor processor, Exchange exchange) {
-        ExtendedExchange ee = (ExtendedExchange) exchange;
         // must remember some properties which we cannot use during onCompletion processing
         // as otherwise we may cause issues
         // but keep the caused exception stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange
-        boolean stop = ee.isRouteStop();
-        ee.setRouteStop(false);
-        Object failureHandled = ee.removeProperty(ExchangePropertyKey.FAILURE_HANDLED);
-        Boolean errorhandlerHandled = ee.getErrorHandlerHandled();
-        ee.setErrorHandlerHandled(null);
-        boolean rollbackOnly = ee.isRollbackOnly();
-        ee.setRollbackOnly(false);
-        boolean rollbackOnlyLast = ee.isRollbackOnlyLast();
-        ee.setRollbackOnlyLast(false);
+        boolean stop = exchange.isRouteStop();
+        exchange.setRouteStop(false);
+        Object failureHandled = exchange.removeProperty(ExchangePropertyKey.FAILURE_HANDLED);
+        Boolean errorhandlerHandled = exchange.getExchangeExtension().getErrorHandlerHandled();
+        exchange.getExchangeExtension().setErrorHandlerHandled(null);
+        boolean rollbackOnly = exchange.isRollbackOnly();
+        exchange.setRollbackOnly(false);
+        boolean rollbackOnlyLast = exchange.isRollbackOnlyLast();
+        exchange.setRollbackOnlyLast(false);
         // and we should not be regarded as exhausted as we are in a onCompletion block
-        boolean exhausted = ee.getExchangeExtension().isRedeliveryExhausted();
-        ee.setRedeliveryExhausted(false);
+        boolean exhausted = exchange.getExchangeExtension().isRedeliveryExhausted();
+        exchange.getExchangeExtension().setRedeliveryExhausted(false);
 
-        Exception cause = ee.getException();
+        Exception cause = exchange.getException();
         if (cause != null) {
-            ee.setException(null);
+            exchange.setException(null);
         }
 
         try {
@@ -191,18 +189,18 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
             exchange.setException(e);
         } finally {
             // restore the options
-            ee.setRouteStop(stop);
+            exchange.setRouteStop(stop);
             if (failureHandled != null) {
-                ee.setProperty(ExchangePropertyKey.FAILURE_HANDLED, failureHandled);
+                exchange.setProperty(ExchangePropertyKey.FAILURE_HANDLED, failureHandled);
             }
             if (errorhandlerHandled != null) {
-                ee.setErrorHandlerHandled(errorhandlerHandled);
+                exchange.getExchangeExtension().setErrorHandlerHandled(errorhandlerHandled);
             }
-            ee.setRollbackOnly(rollbackOnly);
-            ee.setRollbackOnlyLast(rollbackOnlyLast);
-            ee.setRedeliveryExhausted(exhausted);
+            exchange.setRollbackOnly(rollbackOnly);
+            exchange.setRollbackOnlyLast(rollbackOnlyLast);
+            exchange.getExchangeExtension().setRedeliveryExhausted(exhausted);
             if (cause != null) {
-                ee.setException(cause);
+                exchange.setException(cause);
             }
         }
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PipelineHelper.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PipelineHelper.java
index 4e96f8c3ffb..89e15098dca 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PipelineHelper.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PipelineHelper.java
@@ -17,7 +17,6 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
 import org.slf4j.Logger;
 
 /**
@@ -39,9 +38,8 @@ public final class PipelineHelper {
      *                  occurred.
      */
     public static boolean continueProcessing(Exchange exchange, String message, Logger log) {
-        ExtendedExchange ee = (ExtendedExchange) exchange;
-        boolean stop = ee.isFailed() || ee.isRollbackOnly() || ee.isRollbackOnlyLast()
-                || ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled();
+        boolean stop = exchange.isFailed() || exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()
+                || exchange.getExchangeExtension().isErrorHandlerHandledSet() && exchange.getExchangeExtension().isErrorHandlerHandled();
         if (stop) {
             // The errorErrorHandler is only set if satisfactory handling was done
             // by the error handler. It's still an exception, the exchange still failed.
@@ -54,7 +52,7 @@ public final class PipelineHelper {
                 if (exchange.getException() != null) {
                     sb.append(" Exception: ").append(exchange.getException());
                 }
-                if (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled()) {
+                if (exchange.getExchangeExtension().isErrorHandlerHandledSet() && exchange.getExchangeExtension().isErrorHandlerHandled()) {
                     sb.append(" Handled by the error handler.");
                 }
                 log.debug(sb.toString());
@@ -64,7 +62,7 @@ public final class PipelineHelper {
         }
 
         // check for stop
-        if (ee.isRouteStop()) {
+        if (exchange.isRouteStop()) {
             if (log.isDebugEnabled()) {
                 log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);
             }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 01335cb8acd..44bcb60b9f5 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -30,7 +30,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Message;
 import org.apache.camel.Navigate;
@@ -679,7 +678,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
             // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
             // original Exchange is being redelivered, and not a mutated Exchange
             this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null;
-            this.exchange = (ExtendedExchange) exchange;
+            this.exchange = exchange;
             this.callback = callback;
         }
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/transformer/ProcessorTransformer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/transformer/ProcessorTransformer.java
index cbab3c75ae4..62455d9422f 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/transformer/ProcessorTransformer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/transformer/ProcessorTransformer.java
@@ -18,7 +18,6 @@ package org.apache.camel.processor.transformer;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.DataType;
@@ -70,7 +69,7 @@ public class ProcessorTransformer extends Transformer {
         // must create a copy in this way
         Exchange transformExchange = new DefaultExchange(exchange);
         transformExchange.setIn(message);
-        transformExchange.adapt(ExtendedExchange.class).setProperties(exchange.getProperties());
+        transformExchange.getExchangeExtension().setProperties(exchange.getProperties());
         processor.process(transformExchange);
         Message answer = transformExchange.getMessage();
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
index 766fa21a935..627d6df8d39 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
@@ -304,7 +304,7 @@ public class DefaultExchangeTest extends ExchangeTestSupport {
         DefaultExchange exchange = new DefaultExchange(context);
         SafeProperty property = new SafeProperty();
         UnsafeProperty unsafeProperty = new UnsafeProperty();
-        exchange.setSafeCopyProperty(SAFE_PROPERTY, property);
+        exchange.getExchangeExtension().setSafeCopyProperty(SAFE_PROPERTY, property);
         exchange.setProperty(UNSAFE_PROPERTY, unsafeProperty);
 
         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
index 3d88c9826ba..317d2afd4ff 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
@@ -22,7 +22,6 @@ import java.util.function.BiFunction;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.spi.CamelEvent;
@@ -634,7 +633,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for an notify event
             return false;
         }
@@ -680,7 +679,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for an notify event
             return false;
         }
@@ -726,7 +725,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for an notify event
             return false;
         }
@@ -774,7 +773,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for an notify event
             return false;
         }
@@ -822,7 +821,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for an notify event
             return false;
         }
@@ -868,7 +867,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for an notify event
             return false;
         }
@@ -914,7 +913,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for an notify event
             return false;
         }
@@ -960,7 +959,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for notify event
             return false;
         }
@@ -1474,7 +1473,7 @@ public final class EventHelper {
             return false;
         }
 
-        if (((ExtendedExchange) exchange).isNotifyEvent()) {
+        if (exchange.getExchangeExtension().isNotifyEvent()) {
             // do not generate events for an notify event
             return false;
         }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/MessageHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/MessageHelper.java
index 095e21b995f..582ba563844 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/MessageHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/MessageHelper.java
@@ -30,7 +30,6 @@ import java.util.TreeMap;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.Route;
@@ -730,7 +729,7 @@ public final class MessageHelper {
             // instead
             id = exchange.getExchangeExtension().getHistoryNodeId();
             if (id != null) {
-                loc = exchange.adapt(ExtendedExchange.class).getHistoryNodeSource();
+                loc = exchange.getExchangeExtension().getHistoryNodeSource();
                 if (loc == null) {
                     loc = "";
                 }