You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/25 15:43:55 UTC

[camel] 01/02: CAMEL-15753: camel-core - Modularize reifier should not use base

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c531f0e580c64f0729b48c70c77180d08d81c1f2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Oct 25 15:19:23 2020 +0100

    CAMEL-15753: camel-core - Modularize reifier should not use base
---
 .../src/main/java/org/apache/camel/Channel.java    | 35 +++++++++++++++-
 .../spi/ErrorHandlerRedeliveryCustomizer.java      | 48 +++++++++++++++++++++
 .../org/apache/camel/spi/InternalProcessor.java    | 39 ++++++++++-------
 ...Processor.java => SharedInternalProcessor.java} |  5 ++-
 .../org/apache/camel/spi/WrapAwareProcessor.java   | 31 ++++++++++++++
 .../camel/impl/engine/CamelInternalProcessor.java  | 48 +++++++++++++++------
 .../apache/camel/impl/engine}/DefaultChannel.java  | 49 +++++++++-------------
 .../camel/impl/engine/DefaultProducerCache.java    | 15 +++----
 .../impl/cloud/ServiceCallConfigurationTest.java   |  2 +-
 .../camel/impl/lw/LightweightCamelContext.java     |  2 +-
 .../camel/processor/DefaultProcessorFactory.java   | 17 ++++++++
 .../apache/camel/processor/MulticastProcessor.java |  1 +
 .../processor/SharedCamelInternalProcessor.java    |  4 +-
 .../apache/camel/processor/UnitOfWorkProducer.java | 20 ++++++---
 .../org/apache/camel/processor/WrapProcessor.java  | 11 +++--
 .../errorhandler/RedeliveryErrorHandler.java       | 20 ++-------
 core/camel-core-reifier/pom.xml                    | 17 +++++---
 .../org/apache/camel/reifier/AggregateReifier.java | 15 ++++---
 .../apache/camel/reifier/OnCompletionReifier.java  | 17 +++++---
 .../org/apache/camel/reifier/ProcessorReifier.java |  9 ++--
 .../apache/camel/reifier/ResequenceReifier.java    | 25 +++++++----
 .../org/apache/camel/reifier/RouteReifier.java     | 31 ++++++--------
 .../org/apache/camel/reifier/WireTapReifier.java   | 17 +++++---
 .../RandomLoadBalanceJavaDSLBuilderTest.java       |  2 +-
 .../apache/camel/processor/ResequencerTest.java    |  2 +-
 25 files changed, 330 insertions(+), 152 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/Channel.java b/core/camel-api/src/main/java/org/apache/camel/Channel.java
index 6a5911e..e2777f8 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Channel.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Channel.java
@@ -16,6 +16,11 @@
  */
 package org.apache.camel;
 
+import java.util.List;
+
+import org.apache.camel.spi.ErrorHandler;
+import org.apache.camel.spi.InterceptStrategy;
+
 /**
  * Channel acts as a channel between {@link Processor}s in the route graph.
  * <p/>
@@ -24,13 +29,41 @@ package org.apache.camel;
 public interface Channel extends AsyncProcessor, Navigate<Processor> {
 
     /**
-     * Gets the {@link org.apache.camel.processor.ErrorHandler} this Channel uses.
+     * Initializes the channel. If the initialized output definition contained outputs (children) then the
+     * childDefinition will be set so we can leverage fine grained tracing
+     */
+    void initChannel(
+            Route route,
+            NamedNode definition,
+            NamedNode childDefinition,
+            List<InterceptStrategy> interceptors,
+            Processor nextProcessor,
+            NamedRoute routeDefinition,
+            boolean first)
+            throws Exception;
+
+    /**
+     * Post initializes the channel.
+     *
+     * @throws Exception is thrown if some error occurred
+     */
+    void postInitChannel() throws Exception;
+
+    /**
+     * Gets the {@link ErrorHandler} this Channel uses.
      *
      * @return the error handler, or <tt>null</tt> if no error handler is used.
      */
     Processor getErrorHandler();
 
     /**
+     * Sets the {@link ErrorHandler} this Channel uses.
+     *
+     * @param errorHandler the error handler
+     */
+    void setErrorHandler(Processor errorHandler);
+
+    /**
      * Gets the wrapped output that at runtime should be delegated to.
      *
      * @return the output to route the {@link Exchange} to
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ErrorHandlerRedeliveryCustomizer.java b/core/camel-api/src/main/java/org/apache/camel/spi/ErrorHandlerRedeliveryCustomizer.java
new file mode 100644
index 0000000..81eb10d
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ErrorHandlerRedeliveryCustomizer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Processor;
+
+/**
+ * Customizer for {@link ErrorHandler} which supports redeliveries. This is used internally by Camel to instrument the
+ * error handler with additional instrumentations during route initialization.
+ */
+public interface ErrorHandlerRedeliveryCustomizer {
+
+    /**
+     * Determines if redelivery is enabled by checking if any of the redelivery policy settings may allow redeliveries.
+     *
+     * @return           <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
+     * @throws Exception can be thrown
+     */
+    boolean determineIfRedeliveryIsEnabled() throws Exception;
+
+    /**
+     * Returns the output processor
+     */
+    Processor getOutput();
+
+    /**
+     * Allows to change the output of the error handler which are used when optimising the JMX instrumentation to use
+     * either an advice or wrapped processor when calling a processor. The former is faster and therefore preferred,
+     * however if the error handler supports redelivery we need fine grained instrumentation which then must be wrapped
+     * and therefore need to change the output on the error handler.
+     */
+    void changeOutput(Processor output);
+
+}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
index 5b81f74..f5fd933 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
@@ -16,13 +16,13 @@
  */
 package org.apache.camel.spi;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
+import java.util.List;
+
 import org.apache.camel.Processor;
+import org.apache.camel.Route;
 
 /**
- * An internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
+ * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
  * <ul>
  * <li>Execute {@link UnitOfWork}</li>
  * <li>Keeping track which route currently is being routed</li>
@@ -35,25 +35,32 @@ import org.apache.camel.Processor;
  * <li>{@link Transformer}</li>
  * </ul>
  * ... and more.
- * <p/>
- *
- * This is intended for internal use only - do not use this.
  */
 public interface InternalProcessor extends Processor {
 
-    @Override
-    default void process(Exchange exchange) throws Exception {
-        // not in use
-    }
-
     /**
-     * Asynchronous API
+     * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
+     *
+     * @param advice the advice to add
      */
-    boolean process(Exchange exchange, AsyncCallback originalCallback, AsyncProcessor processor, Processor resultProcessor);
+    void addAdvice(CamelInternalProcessorAdvice<?> advice);
 
     /**
-     * Synchronous API
+     * Gets the advice with the given type.
+     *
+     * @param  type the type of the advice
+     * @return      the advice if exists, or <tt>null</tt> if no advices has been added with the given type.
      */
-    void process(Exchange exchange, AsyncProcessor processor, Processor resultProcessor);
+    <T> T getAdvice(Class<T> type);
+
+    void addRoutePolicyAdvice(List<RoutePolicy> routePolicyList);
+
+    void addRouteInflightRepositoryAdvice(InflightRepository inflightRepository, String routeId);
+
+    void addRouteLifecycleAdvice();
+
+    void addManagementInterceptStrategy(ManagementInterceptStrategy.InstrumentationProcessor processor);
+
+    void setRouteOnAdvices(Route route);
 
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/SharedInternalProcessor.java
similarity index 89%
copy from core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
copy to core/camel-api/src/main/java/org/apache/camel/spi/SharedInternalProcessor.java
index 5b81f74..29c4159 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/SharedInternalProcessor.java
@@ -22,7 +22,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
 /**
- * An internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
+ * A Shared (thread safe) internal {@link Processor} that Camel routing engine used during routing for cross cutting
+ * functionality such as:
  * <ul>
  * <li>Execute {@link UnitOfWork}</li>
  * <li>Keeping track which route currently is being routed</li>
@@ -39,7 +40,7 @@ import org.apache.camel.Processor;
  *
  * This is intended for internal use only - do not use this.
  */
-public interface InternalProcessor extends Processor {
+public interface SharedInternalProcessor extends Processor {
 
     @Override
     default void process(Exchange exchange) throws Exception {
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/WrapAwareProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/WrapAwareProcessor.java
new file mode 100644
index 0000000..3b346c5
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/WrapAwareProcessor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Processor;
+
+/**
+ * An interface to represent an object which wraps a {@link Processor}.
+ */
+public interface WrapAwareProcessor {
+
+    /**
+     * Gets the wrapped {@link Processor}
+     */
+    Processor getWrapped();
+
+}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 8fbb08f..fd00655 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -42,6 +42,7 @@ import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage;
 import org.apache.camel.spi.CamelInternalProcessorAdvice;
 import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
 import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -91,7 +92,7 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * The added advices can implement {@link Ordered} to control in which order the advices are executed.
  */
-public class CamelInternalProcessor extends DelegateAsyncProcessor {
+public class CamelInternalProcessor extends DelegateAsyncProcessor implements InternalProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class);
 
@@ -116,11 +117,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         this.shutdownStrategy = camelContext.getShutdownStrategy();
     }
 
-    /**
-     * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
-     *
-     * @param advice the advice to add
-     */
+    @Override
     public void addAdvice(CamelInternalProcessorAdvice<?> advice) {
         advices.add(advice);
         // ensure advices are sorted so they are in the order we want
@@ -131,12 +128,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         }
     }
 
-    /**
-     * Gets the advice with the given type.
-     *
-     * @param  type the type of the advice
-     * @return      the advice if exists, or <tt>null</tt> if no advices has been added with the given type.
-     */
+    @Override
     public <T> T getAdvice(Class<T> type) {
         for (CamelInternalProcessorAdvice task : advices) {
             Object advice = unwrap(task);
@@ -147,6 +139,38 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         return null;
     }
 
+    @Override
+    public void addRoutePolicyAdvice(List<RoutePolicy> routePolicyList) {
+        addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList));
+    }
+
+    @Override
+    public void addRouteInflightRepositoryAdvice(InflightRepository inflightRepository, String routeId) {
+        addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(camelContext.getInflightRepository(), routeId));
+    }
+
+    @Override
+    public void addRouteLifecycleAdvice() {
+        addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
+    }
+
+    @Override
+    public void addManagementInterceptStrategy(InstrumentationProcessor processor) {
+        addAdvice(CamelInternalProcessor.wrap(processor));
+    }
+
+    @Override
+    public void setRouteOnAdvices(Route route) {
+        RoutePolicyAdvice task = getAdvice(RoutePolicyAdvice.class);
+        if (task != null) {
+            task.setRoute(route);
+        }
+        RouteLifecycleAdvice task2 = getAdvice(RouteLifecycleAdvice.class);
+        if (task2 != null) {
+            task2.setRoute(route);
+        }
+    }
+
     /**
      * Callback task to process the advices after processing.
      */
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
similarity index 89%
rename from core/camel-core-processor/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
rename to core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index 8692457..a36bd4d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -14,10 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.processor.channel;
+package org.apache.camel.impl.engine;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -26,21 +27,20 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Channel;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.NamedNode;
 import org.apache.camel.NamedRoute;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.impl.debugger.BacklogDebugger;
 import org.apache.camel.impl.debugger.BacklogTracer;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
-import org.apache.camel.processor.WrapProcessor;
-import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler;
 import org.apache.camel.spi.Debugger;
-import org.apache.camel.spi.ErrorHandler;
+import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.ManagementInterceptStrategy;
 import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.Tracer;
+import org.apache.camel.spi.WrapAwareProcessor;
 import org.apache.camel.support.OrderedComparator;
 import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
@@ -105,11 +105,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
         return nextProcessor;
     }
 
-    /**
-     * Sets the {@link ErrorHandler} this Channel uses.
-     *
-     * @param errorHandler the error handler
-     */
+    @Override
     public void setErrorHandler(Processor errorHandler) {
         this.errorHandler = errorHandler;
     }
@@ -157,15 +153,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
         ServiceHelper.stopAndShutdownServices(output, errorHandler);
     }
 
-    /**
-     * Initializes the channel. If the initialized output definition contained outputs (children) then the
-     * childDefinition will be set so we can leverage fine grained tracing
-     *
-     * @param  route           the route context
-     * @param  definition      the route definition the {@link Channel} represents
-     * @param  childDefinition the child definition
-     * @throws Exception       is thrown if some error occurred
-     */
+    @Override
     public void initChannel(
             Route route,
             NamedNode definition,
@@ -249,9 +237,13 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
                          + " but its not the most optimal solution. Please consider changing your interceptor to comply.",
                         strategy, definition);
             }
-            if (!(wrapped instanceof WrapProcessor)) {
+            if (!(wrapped instanceof WrapAwareProcessor)) {
                 // wrap the target so it becomes a service and we can manage its lifecycle
-                wrapped = new WrapProcessor(wrapped, target);
+                Map<String, Object> args = new HashMap<>();
+                args.put("processor", wrapped);
+                args.put("wrapped", target);
+                wrapped = camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+                        .createProcessor(camelContext, "WrapProcessor", args);
             }
             target = wrapped;
         }
@@ -268,24 +260,21 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
         output = target;
     }
 
-    /**
-     * Post initializes the channel.
-     *
-     * @throws Exception is thrown if some error occurred
-     */
+    @Override
     public void postInitChannel() throws Exception {
         // if jmx was enabled for the processor then either add as advice or wrap and change the processor
         // on the error handler. See more details in the class javadoc of InstrumentationProcessor
         if (instrumentationProcessor != null) {
             boolean redeliveryPossible = false;
-            if (errorHandler instanceof RedeliveryErrorHandler) {
-                redeliveryPossible = ((RedeliveryErrorHandler) errorHandler).determineIfRedeliveryIsEnabled();
+            if (errorHandler instanceof ErrorHandlerRedeliveryCustomizer) {
+                ErrorHandlerRedeliveryCustomizer erh = (ErrorHandlerRedeliveryCustomizer) errorHandler;
+                redeliveryPossible = erh.determineIfRedeliveryIsEnabled();
                 if (redeliveryPossible) {
                     // okay we can redeliver then we need to change the output in the error handler
                     // to use us which we then wrap the call so we can capture before/after for redeliveries as well
-                    Processor currentOutput = ((RedeliveryErrorHandler) errorHandler).getOutput();
+                    Processor currentOutput = erh.getOutput();
                     instrumentationProcessor.setProcessor(currentOutput);
-                    ((RedeliveryErrorHandler) errorHandler).changeOutput(instrumentationProcessor);
+                    erh.changeOutput(instrumentationProcessor);
                 }
             }
             if (!redeliveryPossible) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
index aad974a..19bb01b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
@@ -33,8 +33,8 @@ import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StatefulService;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
-import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.ProducerCache;
+import org.apache.camel.spi.SharedInternalProcessor;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.service.ServiceHelper;
@@ -54,7 +54,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
     private final ExtendedCamelContext camelContext;
     private final ProducerServicePool producers;
     private final Object source;
-    private final InternalProcessor internalProcessor;
+    private final SharedInternalProcessor sharedInternalProcessor;
 
     private EndpointUtilizationStatistics statistics;
     private boolean eventNotifierEnabled = true;
@@ -82,12 +82,13 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
 
         // internal processor used for sending
         try {
-            internalProcessor = (InternalProcessor) this.camelContext.getProcessorFactory().createProcessor(this.camelContext,
-                    "SharedCamelInternalProcessor", null);
+            sharedInternalProcessor
+                    = (SharedInternalProcessor) this.camelContext.getProcessorFactory().createProcessor(this.camelContext,
+                            "SharedCamelInternalProcessor", null);
         } catch (Exception e) {
             throw RuntimeCamelException.wrapRuntimeException(e);
         }
-        if (internalProcessor == null) {
+        if (sharedInternalProcessor == null) {
             throw new IllegalStateException(
                     "Cannot create SharedCamelInternalProcessor from ProcessorFactory." +
                                             "If you have a custom ProcessorFactory then extend DefaultProcessorFactory and let the default able to create SharedCamelInternalProcessor");
@@ -195,7 +196,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
                 }
 
                 // invoke the synchronous method
-                internalProcessor.process(exchange, producer, resultProcessor);
+                sharedInternalProcessor.process(exchange, producer, resultProcessor);
 
             } catch (Throwable e) {
                 // ensure exceptions is caught and set on the exchange
@@ -362,7 +363,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
                 callback = new EventNotifierCallback(callback, exchange, endpoint);
             }
             // invoke the asynchronous method
-            return internalProcessor.process(exchange, callback, producer, resultProcessor);
+            return sharedInternalProcessor.process(exchange, callback, producer, resultProcessor);
         } catch (Throwable e) {
             // ensure exceptions is caught and set on the exchange
             exchange.setException(e);
diff --git a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
index 53bed2e..7fac92a 100644
--- a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
+++ b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
@@ -30,7 +30,7 @@ import org.apache.camel.model.cloud.ServiceCallConfigurationDefinition;
 import org.apache.camel.model.cloud.ServiceCallDefinitionConstants;
 import org.apache.camel.model.cloud.ServiceCallExpressionConfiguration;
 import org.apache.camel.model.language.SimpleExpression;
-import org.apache.camel.processor.channel.DefaultChannel;
+import org.apache.camel.impl.engine.DefaultChannel;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index cb01e8c..9cae189 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -52,6 +52,7 @@ import org.apache.camel.ValueHolder;
 import org.apache.camel.builder.AdviceWithRouteBuilder;
 import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.engine.DefaultChannel;
 import org.apache.camel.impl.engine.DefaultRoute;
 import org.apache.camel.model.DataFormatDefinition;
 import org.apache.camel.model.FaultToleranceConfigurationDefinition;
@@ -67,7 +68,6 @@ import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.rest.RestDefinition;
 import org.apache.camel.model.transformer.TransformerDefinition;
 import org.apache.camel.model.validator.ValidatorDefinition;
-import org.apache.camel.processor.channel.DefaultChannel;
 import org.apache.camel.spi.AnnotationBasedProcessorFactory;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.BeanIntrospection;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
index cb4b71e..475a7c3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
@@ -32,6 +32,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.Route;
 import org.apache.camel.impl.engine.CamelInternalProcessor;
+import org.apache.camel.impl.engine.DefaultChannel;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.InterceptSendToEndpoint;
 import org.apache.camel.spi.ProcessorFactory;
@@ -87,6 +88,10 @@ public class DefaultProcessorFactory implements ProcessorFactory {
         return null;
     }
 
+    // TODO: Make an InternalProcessorFactory that are not for end users
+    // TODO: Add API with suitable method names on InternalProcessorFactory instead of this generic with Map args
+    // TODO: For map args then use Object[] as its faster
+
     @Override
     @SuppressWarnings("unchecked")
     public Processor createProcessor(CamelContext camelContext, String definitionName, Map<String, Object> args)
@@ -120,6 +125,10 @@ public class DefaultProcessorFactory implements ProcessorFactory {
         } else if ("UnitOfWorkProducer".equals(definitionName)) {
             Producer producer = (Producer) args.get("producer");
             return new UnitOfWorkProducer(producer);
+        } else if ("WrapProcessor".equals(definitionName)) {
+            Processor processor = (Processor) args.get("processor");
+            Processor wrapped = (Processor) args.get("wrapped");
+            return new WrapProcessor(processor, wrapped);
         } else if ("InterceptSendToEndpointProcessor".equals(definitionName)) {
             InterceptSendToEndpoint endpoint = (InterceptSendToEndpoint) args.get("endpoint");
             Endpoint delegate = (Endpoint) args.get("delegate");
@@ -129,6 +138,14 @@ public class DefaultProcessorFactory implements ProcessorFactory {
         } else if ("SharedCamelInternalProcessor".equals(definitionName)) {
             return new SharedCamelInternalProcessor(
                     camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext));
+        } else if ("CamelInternalProcessor".equals(definitionName)) {
+            Processor processor = (Processor) args.get("processor");
+            Route route = (Route) args.get("route");
+            CamelInternalProcessor answer = new CamelInternalProcessor(camelContext, processor);
+            answer.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+            return answer;
+        } else if ("DefaultChannel".equals(definitionName)) {
+            return new DefaultChannel(camelContext);
         }
 
         return null;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 33abd8e..50f04ad 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -796,6 +796,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
     protected Processor createUnitOfWorkProcessor(Route route, Processor processor, Exchange exchange) {
         CamelInternalProcessor internal = new CamelInternalProcessor(exchange.getContext(), processor);
 
+        // TODO: use processor factory or spi
         // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
         UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
         if (parent != null) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index bf3a84c..f6c26f0 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -31,9 +31,9 @@ import org.apache.camel.Processor;
 import org.apache.camel.Service;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelInternalProcessorAdvice;
-import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.SharedInternalProcessor;
 import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * The added advices can implement {@link Ordered} to control in which order the advices are executed.
  */
-public class SharedCamelInternalProcessor implements InternalProcessor {
+public class SharedCamelInternalProcessor implements SharedInternalProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(SharedCamelInternalProcessor.class);
     private static final Object[] EMPTY_STATES = new Object[0];
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index d01c9c9..242fbdd 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -16,12 +16,16 @@
  */
 package org.apache.camel.processor;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.service.ServiceHelper;
 
@@ -42,10 +46,16 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer {
         super(producer.getEndpoint());
         this.producer = producer;
         // wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(producer.getEndpoint().getCamelContext(), producer);
-        internal.addAdvice(
-                new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, producer.getEndpoint().getCamelContext()));
-        this.processor = internal;
+
+        Map<String, Object> args = new HashMap<>();
+        args.put("processor", producer);
+        args.put("route", null);
+        ExtendedCamelContext ecc = producer.getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class);
+        try {
+            this.processor = (AsyncProcessor) ecc.getProcessorFactory().createProcessor(ecc, "UnitOfWorkProcessorAdvice", args);
+        } catch (Exception e) {
+            throw RuntimeCamelException.wrapRuntimeException(e);
+        }
     }
 
     @Override
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WrapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WrapProcessor.java
index 76a6983..2658162 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WrapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WrapProcessor.java
@@ -19,13 +19,14 @@ package org.apache.camel.processor;
 import java.util.List;
 
 import org.apache.camel.Processor;
+import org.apache.camel.spi.WrapAwareProcessor;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 import org.apache.camel.support.service.ServiceHelper;
 
 /**
  * A processor which ensures wrapping processors is having lifecycle handled.
  */
-public class WrapProcessor extends DelegateAsyncProcessor {
+public class WrapProcessor extends DelegateAsyncProcessor implements WrapAwareProcessor {
     private final Processor wrapped;
 
     public WrapProcessor(Processor processor, Processor wrapped) {
@@ -35,7 +36,12 @@ public class WrapProcessor extends DelegateAsyncProcessor {
 
     @Override
     public String toString() {
-        return "WrapDelegateAsyncProcessor[" + processor + "]";
+        return "WrapProcessor[" + processor + "]";
+    }
+
+    @Override
+    public Processor getWrapped() {
+        return wrapped;
     }
 
     @Override
@@ -57,5 +63,4 @@ public class WrapProcessor extends DelegateAsyncProcessor {
         super.doStop();
         ServiceHelper.stopService(wrapped);
     }
-
 }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 1ac482d..15869e4 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -40,6 +40,7 @@ import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelLogger;
+import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
 import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.ShutdownPrepared;
@@ -65,7 +66,7 @@ import org.slf4j.LoggerFactory;
  * according to what they support.
  */
 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
-        implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
+        implements ErrorHandlerRedeliveryCustomizer, AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
 
     private static final Logger LOG = LoggerFactory.getLogger(RedeliveryErrorHandler.class);
 
@@ -192,12 +193,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         return callback.getFuture();
     }
 
-    /**
-     * Allows to change the output of the error handler which are used when optimising the JMX instrumentation to use
-     * either an advice or wrapped processor when calling a processor. The former is faster and therefore preferred,
-     * however if the error handler supports redelivery we need fine grained instrumentation which then must be wrapped
-     * and therefore need to change the output on the error handler.
-     */
+    @Override
     public void changeOutput(Processor output) {
         this.output = output;
         this.outputAsync = AsyncProcessorConverterHelper.convert(output);
@@ -307,9 +303,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         return answer;
     }
 
-    /**
-     * Returns the output processor
-     */
     @Override
     public Processor getOutput() {
         return output;
@@ -1455,12 +1448,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         }
     }
 
-    /**
-     * Determines if redelivery is enabled by checking if any of the redelivery policy settings may allow redeliveries.
-     *
-     * @return           <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
-     * @throws Exception can be thrown
-     */
+    @Override
     public boolean determineIfRedeliveryIsEnabled() throws Exception {
         // determine if redeliver is enabled either on error handler
         if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
diff --git a/core/camel-core-reifier/pom.xml b/core/camel-core-reifier/pom.xml
index 8d99a4f..6483577 100644
--- a/core/camel-core-reifier/pom.xml
+++ b/core/camel-core-reifier/pom.xml
@@ -46,11 +46,6 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-api</artifactId>
         </dependency>
-        <!-- TODO: Remove this dependency -->
-        <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-base</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-support</artifactId>
@@ -58,10 +53,22 @@
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-model</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.camel</groupId>
+                    <artifactId>camel-base</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-processor</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.camel</groupId>
+                    <artifactId>camel-base</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 3d7f7e7..e852aab 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -16,16 +16,19 @@
  */
 package org.apache.camel.reifier;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.AggregationStrategy;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
 import org.apache.camel.model.AggregateDefinition;
 import org.apache.camel.model.OptimisticLockRetryPolicyDefinition;
 import org.apache.camel.model.ProcessorDefinition;
@@ -50,10 +53,12 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
     protected AggregateProcessor createAggregator() throws Exception {
         Processor childProcessor = this.createChildProcessor(true);
 
-        // TODO: Make this via SPI or some facade
         // wrap the aggregate route in a unit of work processor
-        CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, childProcessor);
-        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+        Map<String, Object> args = new HashMap<>();
+        args.put("processor", childProcessor);
+        args.put("route", route);
+        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
 
         Expression correlation = createExpression(definition.getExpression());
         AggregationStrategy strategy = createAggregationStrategy();
@@ -70,7 +75,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
         }
 
         AggregateProcessor answer
-                = new AggregateProcessor(camelContext, internal, correlation, strategy, threadPool, shutdownThreadPool);
+                = new AggregateProcessor(camelContext, target, correlation, strategy, threadPool, shutdownThreadPool);
 
         AggregationRepository repository = createAggregationRepository();
         if (repository != null) {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index 7866a00..32f5ce9 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -16,12 +16,15 @@
  */
 package org.apache.camel.reifier;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
 import org.apache.camel.model.OnCompletionDefinition;
 import org.apache.camel.model.OnCompletionMode;
 import org.apache.camel.model.ProcessorDefinition;
@@ -51,12 +54,14 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
 
         Processor childProcessor = this.createChildProcessor(true);
 
-        // TODO: Make this via SPI or some facade
         // wrap the on completion route in a unit of work processor
-        CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, childProcessor);
-        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+        Map<String, Object> args = new HashMap<>();
+        args.put("processor", childProcessor);
+        args.put("route", route);
+        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
 
-        route.setOnCompletion(getId(definition), internal);
+        route.setOnCompletion(getId(definition), target);
 
         Predicate when = null;
         if (definition.getOnWhen() != null) {
@@ -71,7 +76,7 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
                 || parse(OnCompletionMode.class, definition.getMode()) == OnCompletionMode.AfterConsumer;
 
         OnCompletionProcessor answer = new OnCompletionProcessor(
-                camelContext, internal, threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when,
+                camelContext, target, threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when,
                 original, afterConsumer);
         return answer;
     }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index bf6297f..bc7288b 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -104,7 +104,6 @@ import org.apache.camel.model.WireTapDefinition;
 import org.apache.camel.model.cloud.ServiceCallDefinition;
 import org.apache.camel.processor.InterceptEndpointProcessor;
 import org.apache.camel.processor.Pipeline;
-import org.apache.camel.processor.channel.DefaultChannel;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.InterceptStrategy;
@@ -529,9 +528,9 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
 
     protected Channel wrapChannel(Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler)
             throws Exception {
-        // put a channel in between this and each output to control the route
-        // flow logic
-        DefaultChannel channel = new DefaultChannel(camelContext);
+        // put a channel in between this and each output to control the route flow logic
+        Channel channel = (Channel) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+                .createProcessor(camelContext, "DefaultChannel", null);
 
         // add interceptor strategies to the channel must be in this order:
         // camel context, route context, local
@@ -635,7 +634,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
      * @param  inheritErrorHandler whether to inherit error handler
      * @throws Exception           can be thrown if failed to create error handler builder
      */
-    private void wrapChannelInErrorHandler(DefaultChannel channel, Boolean inheritErrorHandler) throws Exception {
+    private void wrapChannelInErrorHandler(Channel channel, Boolean inheritErrorHandler) throws Exception {
         if (inheritErrorHandler == null || inheritErrorHandler) {
             log.trace("{} is configured to inheritErrorHandler", definition);
             Processor output = channel.getOutput();
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
index d7b15de..e0caeb4 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.reifier;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.ResequenceDefinition;
 import org.apache.camel.model.config.BatchResequencerConfig;
@@ -72,10 +76,12 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         Processor processor = this.createChildProcessor(true);
         Expression expression = createExpression(definition.getExpression());
 
-        // TODO: Make this via SPI or some facade
         // and wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, processor);
-        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+        Map<String, Object> args = new HashMap<>();
+        args.put("processor", processor);
+        args.put("route", route);
+        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
 
         ObjectHelper.notNull(config, "config", this);
         ObjectHelper.notNull(expression, "expression", this);
@@ -83,7 +89,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         boolean isReverse = parseBoolean(config.getReverse(), false);
         boolean isAllowDuplicates = parseBoolean(config.getAllowDuplicates(), false);
 
-        Resequencer resequencer = new Resequencer(camelContext, internal, expression, isAllowDuplicates, isReverse);
+        Resequencer resequencer = new Resequencer(camelContext, target, expression, isAllowDuplicates, isReverse);
         resequencer.setBatchSize(parseInt(config.getBatchSize()));
         resequencer.setBatchTimeout(parseDuration(config.getBatchTimeout()));
         resequencer.setReverse(isReverse);
@@ -105,8 +111,11 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         Processor processor = this.createChildProcessor(true);
         Expression expression = createExpression(definition.getExpression());
 
-        CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, processor);
-        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+        Map<String, Object> args = new HashMap<>();
+        args.put("processor", processor);
+        args.put("route", route);
+        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
 
         ObjectHelper.notNull(config, "config", this);
         ObjectHelper.notNull(expression, "expression", this);
@@ -122,7 +131,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         }
         comparator.setExpression(expression);
 
-        StreamResequencer resequencer = new StreamResequencer(camelContext, internal, comparator, expression);
+        StreamResequencer resequencer = new StreamResequencer(camelContext, target, comparator, expression);
         resequencer.setTimeout(parseDuration(config.getTimeout()));
         if (config.getDeliveryAttemptInterval() != null) {
             resequencer.setDeliveryAttemptInterval(parseDuration(config.getDeliveryAttemptInterval()));
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
index 6a9c25c..ddb3bee 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -32,7 +32,6 @@ import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.ShutdownRoute;
 import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.PropertyDefinition;
 import org.apache.camel.model.RouteDefinition;
@@ -40,6 +39,7 @@ import org.apache.camel.processor.ContractAdvice;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.reifier.rest.RestBindingReifier;
 import org.apache.camel.spi.Contract;
+import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.ManagementInterceptStrategy;
 import org.apache.camel.spi.RoutePolicy;
@@ -236,10 +236,12 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
         // handles preparing the response from the exchange in regard to IN vs OUT messages etc
         Processor target = new Pipeline(camelContext, eventDrivenProcessors);
 
-        // TODO: Make this via SPI or some facade
         // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
-        CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, target);
-        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+        Map<String, Object> args = new HashMap<>();
+        args.put("processor", target);
+        args.put("route", route);
+        InternalProcessor internal = (InternalProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+                .createProcessor(camelContext, "CamelInternalProcessor", args);
 
         // and then optionally add route policy processor if a custom policy is set
         List<RoutePolicy> routePolicyList = route.getRoutePolicyList();
@@ -256,21 +258,20 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
                 }
             }
 
-            internal.addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList));
+            internal.addRoutePolicyAdvice(routePolicyList);
         }
 
         // wrap in route inflight processor to track number of inflight exchanges for the route
-        internal.addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(
-                camelContext.getInflightRepository(), route.getRouteId()));
+        internal.addRouteInflightRepositoryAdvice(camelContext.getInflightRepository(), route.getRouteId());
 
         // wrap in JMX instrumentation processor that is used for performance stats
         ManagementInterceptStrategy managementInterceptStrategy = route.getManagementInterceptStrategy();
         if (managementInterceptStrategy != null) {
-            internal.addAdvice(CamelInternalProcessor.wrap(managementInterceptStrategy.createProcessor("route")));
+            internal.addManagementInterceptStrategy(managementInterceptStrategy.createProcessor("route"));
         }
 
         // wrap in route lifecycle
-        internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
+        internal.addRouteLifecycleAdvice();
 
         // add advices
         if (definition.getRestBindingDefinition() != null) {
@@ -308,16 +309,8 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
             route.setAutoStartup(isAutoStartup);
         }
 
-        // after the route is created then set the route on the policy processor so we get hold of it
-        CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class);
-        if (task != null) {
-            task.setRoute(route);
-        }
-        CamelInternalProcessor.RouteLifecycleAdvice task2
-                = internal.getAdvice(CamelInternalProcessor.RouteLifecycleAdvice.class);
-        if (task2 != null) {
-            task2.setRoute(route);
-        }
+        // after the route is created then set the route on the policy processor(s) so we get hold of it
+        internal.setRouteOnAdvices(route);
 
         // invoke init on route policy
         if (routePolicyList != null && !routePolicyList.isEmpty()) {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index c14b092..f0cb9fd 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -16,13 +16,16 @@
  */
 package org.apache.camel.reifier;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.SetHeaderDefinition;
 import org.apache.camel.model.WireTapDefinition;
@@ -48,18 +51,20 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
         SendDynamicProcessor dynamicTo = (SendDynamicProcessor) super.createProcessor();
 
         // create error handler we need to use for processing the wire tapped
-        Processor target = wrapInErrorHandler(dynamicTo, true);
+        Processor childProcessor = wrapInErrorHandler(dynamicTo, true);
 
-        // TODO: Make this via SPI or some facade
         // and wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, target);
-        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+        Map<String, Object> args = new HashMap<>();
+        args.put("processor", childProcessor);
+        args.put("route", route);
+        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
 
         // is true by default
         boolean isCopy = parseBoolean(definition.getCopy(), true);
 
         WireTapProcessor answer = new WireTapProcessor(
-                dynamicTo, internal,
+                dynamicTo, target,
                 parse(ExchangePattern.class, definition.getPattern()),
                 threadPool, shutdownThreadPool,
                 parseBoolean(definition.getDynamicUri(), true));
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
index ba17095..0bc0f6b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
@@ -21,12 +21,12 @@ import java.util.List;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
+import org.apache.camel.impl.engine.DefaultChannel;
 import org.apache.camel.model.LoadBalanceDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.SendDefinition;
 import org.apache.camel.model.loadbalancer.RandomLoadBalancerDefinition;
-import org.apache.camel.processor.channel.DefaultChannel;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
index 5499007..1951fd4 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
@@ -23,8 +23,8 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Route;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.engine.DefaultChannel;
 import org.apache.camel.impl.engine.DefaultRoute;
-import org.apache.camel.processor.channel.DefaultChannel;
 import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;