You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/04/02 06:26:31 UTC

[camel] 03/04: CAMEL-15105: make the InternalProcessorFactory a plugin of the context

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 525b70ef436fb8adf113d82ad79e1c3527570c17
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat Apr 1 14:31:47 2023 +0200

    CAMEL-15105: make the InternalProcessorFactory a plugin of the context
---
 .../java/org/apache/camel/ExtendedCamelContext.java    | 15 ---------------
 .../apache/camel/impl/engine/AbstractCamelContext.java |  2 +-
 .../impl/engine/DefaultCamelContextExtension.java      | 18 ------------------
 .../org/apache/camel/impl/engine/DefaultChannel.java   |  3 ++-
 .../camel/impl/engine/SubscribeMethodProcessor.java    |  3 ++-
 .../camel/impl/ExtendedCamelContextConfigurer.java     |  6 ------
 .../impl/lw/LightweightCamelContextExtension.java      | 11 -----------
 .../org/apache/camel/processor/MulticastProcessor.java |  3 ++-
 .../org/apache/camel/processor/UnitOfWorkProducer.java |  3 ++-
 .../org/apache/camel/reifier/AggregateReifier.java     |  3 ++-
 .../org/apache/camel/reifier/OnCompletionReifier.java  |  3 ++-
 .../org/apache/camel/reifier/ProcessorReifier.java     |  2 +-
 .../org/apache/camel/reifier/ResequenceReifier.java    |  5 +++--
 .../java/org/apache/camel/reifier/RouteReifier.java    |  3 ++-
 .../java/org/apache/camel/reifier/WireTapReifier.java  |  3 ++-
 .../reifier/errorhandler/ErrorHandlerReifier.java      |  3 ++-
 .../camel/support/DefaultInterceptSendToEndpoint.java  |  2 +-
 .../java/org/apache/camel/support/PluginHelper.java    | 18 ++++++++++++++++++
 .../camel/support/cache/DefaultProducerCache.java      |  5 +++--
 19 files changed, 45 insertions(+), 66 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 50f9a58a498..c1850cfb5c7 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -37,7 +37,6 @@ import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.InterceptEndpointFactory;
 import org.apache.camel.spi.InterceptStrategy;
-import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.LogListener;
 import org.apache.camel.spi.ManagementMBeanAssembler;
@@ -316,20 +315,6 @@ public interface ExtendedCamelContext {
      */
     FactoryFinder getFactoryFinder(String path);
 
-    /**
-     * Gets the current {@link org.apache.camel.spi.InternalProcessorFactory}
-     *
-     * @return the factory
-     */
-    InternalProcessorFactory getInternalProcessorFactory();
-
-    /**
-     * Sets a custom {@link org.apache.camel.spi.InternalProcessorFactory}
-     *
-     * @param internalProcessorFactory the custom factory
-     */
-    void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory);
-
     /**
      * Gets the current {@link org.apache.camel.spi.InterceptEndpointFactory}
      *
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 47609926f46..2e288f9763c 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -221,7 +221,6 @@ public abstract class AbstractCamelContext extends BaseService
     volatile ModelToXMLDumper modelToXMLDumper;
     volatile RestBindingJaxbDataFormatFactory restBindingJaxbDataFormatFactory;
     volatile RuntimeCamelCatalog runtimeCamelCatalog;
-    volatile InternalProcessorFactory internalProcessorFactory;
     volatile InterceptEndpointFactory interceptEndpointFactory;
     volatile RouteFactory routeFactory;
     volatile AsyncProcessorAwaitManager asyncProcessorAwaitManager;
@@ -380,6 +379,7 @@ public abstract class AbstractCamelContext extends BaseService
         camelContextExtension.lazyAddContextPlugin(HealthCheckResolver.class, this::createHealthCheckResolver);
         camelContextExtension.lazyAddContextPlugin(DevConsoleResolver.class, this::createDevConsoleResolver);
         camelContextExtension.lazyAddContextPlugin(ProcessorFactory.class, this::createProcessorFactory);
+        camelContextExtension.lazyAddContextPlugin(InternalProcessorFactory.class, this::createInternalProcessorFactory);
 
         if (build) {
             try {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
index 7e5f6e45a66..482ef419e07 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
@@ -54,7 +54,6 @@ import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.InterceptEndpointFactory;
 import org.apache.camel.spi.InterceptStrategy;
-import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.LogListener;
 import org.apache.camel.spi.ManagementMBeanAssembler;
@@ -466,23 +465,6 @@ class DefaultCamelContextExtension implements ExtendedCamelContext {
         this.lightweight = lightweight;
     }
 
-    @Override
-    public InternalProcessorFactory getInternalProcessorFactory() {
-        if (camelContext.internalProcessorFactory == null) {
-            synchronized (camelContext.lock) {
-                if (camelContext.internalProcessorFactory == null) {
-                    setInternalProcessorFactory(camelContext.createInternalProcessorFactory());
-                }
-            }
-        }
-        return camelContext.internalProcessorFactory;
-    }
-
-    @Override
-    public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
-        camelContext.internalProcessorFactory = camelContext.getInternalServiceManager().addService(internalProcessorFactory);
-    }
-
     @Override
     public InterceptEndpointFactory getInterceptEndpointFactory() {
         if (camelContext.interceptEndpointFactory == null) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index f0ab9329db5..50cc9201a28 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -40,6 +40,7 @@ import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.Tracer;
 import org.apache.camel.spi.WrapAwareProcessor;
 import org.apache.camel.support.OrderedComparator;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -223,7 +224,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
             }
             if (!(wrapped instanceof WrapAwareProcessor)) {
                 // wrap the target so it becomes a service and we can manage its lifecycle
-                wrapped = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+                wrapped = PluginHelper.getInternalProcessorFactory(camelContext)
                         .createWrapProcessor(wrapped, target);
             }
             target = wrapped;
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
index 813ccdc85b5..48dcbf336c9 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
@@ -31,6 +31,7 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Language;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.builder.PredicateBuilder;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -58,7 +59,7 @@ public final class SubscribeMethodProcessor extends AsyncProcessorSupport implem
                 .getBeanProcessorFactory().createBeanProcessor(endpoint.getCamelContext(), pojo, method);
 
         // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
-        answer = endpoint.getCamelContext().getCamelContextExtension().getInternalProcessorFactory()
+        answer = PluginHelper.getInternalProcessorFactory(endpoint.getCamelContext())
                 .addUnitOfWorkProcessorAdvice(endpoint.getCamelContext(), answer, null);
         Predicate p;
         if (ObjectHelper.isEmpty(predicate)) {
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index 244fd82d8c2..d4a7548ee8a 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -47,8 +47,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "HeadersMapFactory": target.setHeadersMapFactory(property(camelContext, org.apache.camel.spi.HeadersMapFactory.class, value)); return true;
         case "interceptendpointfactory":
         case "InterceptEndpointFactory": target.setInterceptEndpointFactory(property(camelContext, org.apache.camel.spi.InterceptEndpointFactory.class, value)); return true;
-        case "internalprocessorfactory":
-        case "InternalProcessorFactory": target.setInternalProcessorFactory(property(camelContext, org.apache.camel.spi.InternalProcessorFactory.class, value)); return true;
         case "lightweight":
         case "Lightweight": target.setLightweight(property(camelContext, boolean.class, value)); return true;
         case "modeltoxmldumper":
@@ -108,8 +106,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "HeadersMapFactory": return org.apache.camel.spi.HeadersMapFactory.class;
         case "interceptendpointfactory":
         case "InterceptEndpointFactory": return org.apache.camel.spi.InterceptEndpointFactory.class;
-        case "internalprocessorfactory":
-        case "InternalProcessorFactory": return org.apache.camel.spi.InternalProcessorFactory.class;
         case "lightweight":
         case "Lightweight": return boolean.class;
         case "modeltoxmldumper":
@@ -170,8 +166,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "HeadersMapFactory": return target.getHeadersMapFactory();
         case "interceptendpointfactory":
         case "InterceptEndpointFactory": return target.getInterceptEndpointFactory();
-        case "internalprocessorfactory":
-        case "InternalProcessorFactory": return target.getInternalProcessorFactory();
         case "lightweight":
         case "Lightweight": return target.isLightweight();
         case "modeltoxmldumper":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
index e280a82d6dc..14d389a7e31 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
@@ -52,7 +52,6 @@ import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.InterceptEndpointFactory;
 import org.apache.camel.spi.InterceptStrategy;
-import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.LogListener;
 import org.apache.camel.spi.ManagementMBeanAssembler;
 import org.apache.camel.spi.ModelToXMLDumper;
@@ -242,16 +241,6 @@ class LightweightCamelContextExtension implements ExtendedCamelContext {
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    public InternalProcessorFactory getInternalProcessorFactory() {
-        return camelContext.getCamelContextExtension().getInternalProcessorFactory();
-    }
-
-    @Override
-    public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public InterceptEndpointFactory getInterceptEndpointFactory() {
         throw new UnsupportedOperationException();
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index efdb733fb50..c0a7fd9df6b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -66,6 +66,7 @@ import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
@@ -186,7 +187,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
                               boolean parallelAggregate) {
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
-        this.internalProcessorFactory = camelContext.getCamelContextExtension().getInternalProcessorFactory();
+        this.internalProcessorFactory = PluginHelper.getInternalProcessorFactory(camelContext);
         this.route = route;
         this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor();
         this.processors = processors;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index b47529114a0..f3e893a7cca 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -23,6 +23,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
 import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.service.ServiceHelper;
 
 /**
@@ -43,7 +44,7 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer {
         this.producer = producer;
         // wrap in unit of work
         CamelContext ecc = producer.getEndpoint().getCamelContext();
-        this.processor = ecc.getCamelContextExtension().getInternalProcessorFactory()
+        this.processor = PluginHelper.getInternalProcessorFactory(ecc)
                 .addUnitOfWorkProcessorAdvice(ecc, producer, null);
     }
 
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 6f1f3117d08..228157e413b 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -33,6 +33,7 @@ import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.support.PluginHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +54,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
         Processor childProcessor = this.createChildProcessor(true);
 
         // wrap the aggregate route in a unit of work processor
-        AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+        AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
                 .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
 
         Expression correlation = createExpression(definition.getExpression());
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index b3e93f58c65..96753213850 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -26,6 +26,7 @@ import org.apache.camel.model.OnCompletionDefinition;
 import org.apache.camel.model.OnCompletionMode;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.processor.OnCompletionProcessor;
+import org.apache.camel.support.PluginHelper;
 
 public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition> {
 
@@ -52,7 +53,7 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
         Processor childProcessor = this.createChildProcessor(true);
 
         // wrap the on completion route in a unit of work processor
-        AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+        AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
                 .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
 
         route.setOnCompletion(getId(definition), target);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 41096b48c89..7d6209606c9 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -630,7 +630,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
     protected Channel wrapChannel(Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler)
             throws Exception {
         // put a channel in between this and each output to control the route flow logic
-        Channel channel = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+        Channel channel = PluginHelper.getInternalProcessorFactory(camelContext)
                 .createChannel(camelContext);
 
         // add interceptor strategies to the channel must be in this order:
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
index 5a2d46068e5..bc85e2c9cf8 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
@@ -29,6 +29,7 @@ import org.apache.camel.processor.Resequencer;
 import org.apache.camel.processor.StreamResequencer;
 import org.apache.camel.processor.resequencer.DefaultExchangeComparator;
 import org.apache.camel.processor.resequencer.ExpressionResultComparator;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.util.ObjectHelper;
 
 public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
@@ -73,7 +74,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         Expression expression = createExpression(definition.getExpression());
 
         // and wrap in unit of work
-        AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+        AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
                 .addUnitOfWorkProcessorAdvice(camelContext, processor, route);
 
         ObjectHelper.notNull(config, "config", this);
@@ -110,7 +111,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         Processor processor = this.createChildProcessor(true);
         Expression expression = createExpression(definition.getExpression());
 
-        AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+        AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
                 .addUnitOfWorkProcessorAdvice(camelContext, processor, route);
 
         ObjectHelper.notNull(config, "config", this);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
index e72df220d57..45a0f6584c1 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -49,6 +49,7 @@ import org.apache.camel.spi.ManagementInterceptStrategy;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.support.PluginHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -232,7 +233,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
         target.setRouteId(id);
 
         // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
-        InternalProcessor internal = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+        InternalProcessor internal = PluginHelper.getInternalProcessorFactory(camelContext)
                 .addUnitOfWorkProcessorAdvice(camelContext, target, route);
 
         // configure route policy
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 34096a72f64..d8562141286 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -35,6 +35,7 @@ import org.apache.camel.processor.WireTapProcessor;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.LanguageSupport;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.util.StringHelper;
 
 public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
@@ -86,7 +87,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
         Processor childProcessor = wrapInErrorHandler(producer);
 
         // and wrap in unit of work
-        AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+        AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
                 .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
 
         // is true by default
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
index 985d2665cab..ec6b3d80297 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
@@ -44,6 +44,7 @@ import org.apache.camel.reifier.AbstractReifier;
 import org.apache.camel.spi.ErrorHandler;
 import org.apache.camel.spi.Language;
 import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.util.ObjectHelper;
 
 public abstract class ErrorHandlerReifier<T extends ErrorHandlerFactory> extends AbstractReifier {
@@ -477,7 +478,7 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerFactory> extends
         }
         if (processor != null) {
             // must wrap the processor in an UoW
-            processor = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+            processor = PluginHelper.getInternalProcessorFactory(camelContext)
                     .addUnitOfWorkProcessorAdvice(camelContext, processor, route);
         }
         return processor;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
index 669b066e05a..b35c1ef0893 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
@@ -134,7 +134,7 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint,
     @Override
     public AsyncProducer createAsyncProducer() throws Exception {
         AsyncProducer producer = delegate.createAsyncProducer();
-        return camelContext.getCamelContextExtension().getInternalProcessorFactory()
+        return PluginHelper.getInternalProcessorFactory(camelContext)
                 .createInterceptSendToEndpointProcessor(this, delegate, producer, skip);
     }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
index 30ddec8e04e..5396ed47732 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
@@ -30,6 +30,7 @@ import org.apache.camel.spi.ComponentResolver;
 import org.apache.camel.spi.ConfigurerResolver;
 import org.apache.camel.spi.DataFormatResolver;
 import org.apache.camel.spi.FactoryFinderResolver;
+import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.LanguageResolver;
 import org.apache.camel.spi.ModelJAXBContextFactory;
 import org.apache.camel.spi.ModelineFactory;
@@ -365,4 +366,21 @@ public final class PluginHelper {
         return extendedCamelContext.getContextPlugin(ProcessorFactory.class);
     }
 
+    /**
+     * Gets the current {@link org.apache.camel.spi.InternalProcessorFactory}
+     *
+     * @return the factory
+     */
+    public static InternalProcessorFactory getInternalProcessorFactory(CamelContext camelContext) {
+        return getInternalProcessorFactory(camelContext.getCamelContextExtension());
+    }
+
+    /**
+     * Gets the current {@link org.apache.camel.spi.InternalProcessorFactory}
+     *
+     * @return the factory
+     */
+    public static InternalProcessorFactory getInternalProcessorFactory(ExtendedCamelContext extendedCamelContext) {
+        return extendedCamelContext.getContextPlugin(InternalProcessorFactory.class);
+    }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
index 17612316e62..16d0a986aa7 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
@@ -36,6 +36,7 @@ import org.apache.camel.spi.SharedInternalProcessor;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.DefaultEndpointUtilizationStatistics;
 import org.apache.camel.support.EventHelper;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.StopWatch;
@@ -79,8 +80,8 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
 
         // internal processor used for sending
         sharedInternalProcessor
-                = this.camelContext.getCamelContextExtension()
-                        .getInternalProcessorFactory().createSharedCamelInternalProcessor(camelContext);
+                = PluginHelper.getInternalProcessorFactory(this.camelContext)
+                        .createSharedCamelInternalProcessor(camelContext);
     }
 
     protected ProducerServicePool createServicePool(CamelContext camelContext, int cacheSize) {