You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/26 07:07:48 UTC

[camel] branch master updated: CAMEL-15753: camel-core - Modularize and move internal processors into their own factory.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new d2f9f03  CAMEL-15753: camel-core - Modularize and move internal processors into their own factory.
d2f9f03 is described below

commit d2f9f03e0cd3de3fe5097c708866ed5d02d00bc4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 26 08:07:14 2020 +0100

    CAMEL-15753: camel-core - Modularize and move internal processors into their own factory.
---
 .../org/apache/camel/ExtendedCamelContext.java     | 15 +++++
 .../org/apache/camel/spi/InternalProcessor.java    |  3 +-
 .../apache/camel/spi/InternalProcessorFactory.java | 54 ++++++++++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    | 29 +++++++++-
 .../apache/camel/impl/engine/DefaultChannel.java   |  8 +--
 .../camel/impl/engine/DefaultProducerCache.java    | 15 +----
 .../camel/impl/engine/SimpleCamelContext.java      | 10 ++++
 .../impl/engine/SubscribeMethodProcessor.java      |  8 +--
 .../camel/impl/ExtendedCamelContextConfigurer.java |  5 ++
 .../camel/impl/lw/LightweightCamelContext.java     | 11 ++++
 .../impl/lw/LightweightRuntimeCamelContext.java    | 17 +++++-
 .../org/apache/camel/internal-processor-factory    |  2 +
 .../processor/DefaultInternalProcessorFactory.java | 66 ++++++++++++++++++++++
 .../InterceptSendToEndpointProcessor.java          |  2 +-
 .../apache/camel/processor/UnitOfWorkProducer.java | 14 +----
 .../org/apache/camel/reifier/AggregateReifier.java |  9 +--
 .../apache/camel/reifier/OnCompletionReifier.java  |  9 +--
 .../org/apache/camel/reifier/ProcessorReifier.java |  4 +-
 .../apache/camel/reifier/ResequenceReifier.java    | 17 ++----
 .../org/apache/camel/reifier/RouteReifier.java     |  7 +--
 .../org/apache/camel/reifier/WireTapReifier.java   |  9 +--
 .../support/DefaultInterceptSendToEndpoint.java    | 11 +---
 22 files changed, 232 insertions(+), 93 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 6700d00..25e4817 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -40,6 +40,7 @@ import org.apache.camel.spi.FactoryFinderResolver;
 import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.InterceptEndpointFactory;
 import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.LanguageResolver;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.LogListener;
@@ -357,6 +358,20 @@ public interface ExtendedCamelContext extends CamelContext {
     void setProcessorFactory(ProcessorFactory processorFactory);
 
     /**
+     * Gets the current {@link org.apache.camel.spi.InternalProcessorFactory}
+     *
+     * @return the factory
+     */
+    InternalProcessorFactory getInternalProcessorFactory();
+
+    /**
+     * Sets a custom {@link org.apache.camel.spi.InternalProcessorFactory}
+     *
+     * @param internalProcessorFactory the custom factory
+     */
+    void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory);
+
+    /**
      * Gets the current {@link org.apache.camel.spi.InterceptEndpointFactory}
      *
      * @return the factory
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
index f5fd933..1275ef6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
@@ -18,6 +18,7 @@ package org.apache.camel.spi;
 
 import java.util.List;
 
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 
@@ -36,7 +37,7 @@ import org.apache.camel.Route;
  * </ul>
  * ... and more.
  */
-public interface InternalProcessor extends Processor {
+public interface InternalProcessor extends AsyncProcessor {
 
     /**
      * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
new file mode 100644
index 0000000..c619239
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+
+/**
+ * A factory used internally by Camel to create {@link Processor} and other internal building blocks. This factory is
+ * used to have loose coupling between the modules in core. Camel user user should only use {@link ProcessorFactory}.
+ *
+ * @see ProcessorFactory
+ */
+public interface InternalProcessorFactory {
+
+    /**
+     * Service factory key.
+     */
+    String FACTORY = "internal-processor-factory";
+
+    InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, Processor processor, Route route);
+
+    SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext);
+
+    Channel createChannel(CamelContext camelContext);
+
+    AsyncProducer createInterceptSendToEndpointProcessor(
+            InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip);
+
+    AsyncProcessor createWrapProcessor(Processor processor, Processor wrapped);
+
+    AsyncProducer createUnitOfWorkProducer(Producer producer);
+
+}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index dcf5a1d..88633e8 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -104,6 +104,7 @@ import org.apache.camel.spi.Injector;
 import org.apache.camel.spi.InterceptEndpointFactory;
 import org.apache.camel.spi.InterceptSendToEndpoint;
 import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.Language;
 import org.apache.camel.spi.LanguageResolver;
 import org.apache.camel.spi.LifecycleStrategy;
@@ -275,6 +276,7 @@ public abstract class AbstractCamelContext extends BaseService
     private volatile PackageScanResourceResolver packageScanResourceResolver;
     private volatile NodeIdFactory nodeIdFactory;
     private volatile ProcessorFactory processorFactory;
+    private volatile InternalProcessorFactory internalProcessorFactory;
     private volatile InterceptEndpointFactory interceptEndpointFactory;
     private volatile RouteFactory routeFactory;
     private volatile MessageHistoryFactory messageHistoryFactory;
@@ -3252,19 +3254,23 @@ public abstract class AbstractCamelContext extends BaseService
         getModelToXMLDumper();
         getClassResolver();
         getNodeIdFactory();
-        getProcessorFactory();
         getModelJAXBContextFactory();
         getUuidGenerator();
         getUnitOfWorkFactory();
         getRouteController();
         try {
+            getProcessorFactory();
+            getInternalProcessorFactory();
+        } catch (IllegalArgumentException e) {
+            // ignore in case camel-core-processor is not on the classpath
+        }
+        try {
             getBeanProxyFactory();
             getBeanProcessorFactory();
         } catch (IllegalArgumentException e) {
             // ignore in case camel-bean is not on the classpath
         }
         getBeanPostProcessor();
-        getProcessorFactory();
     }
 
     /**
@@ -3806,6 +3812,23 @@ public abstract class AbstractCamelContext extends BaseService
     }
 
     @Override
+    public InternalProcessorFactory getInternalProcessorFactory() {
+        if (internalProcessorFactory == null) {
+            synchronized (lock) {
+                if (internalProcessorFactory == null) {
+                    setInternalProcessorFactory(createInternalProcessorFactory());
+                }
+            }
+        }
+        return internalProcessorFactory;
+    }
+
+    @Override
+    public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
+        this.internalProcessorFactory = doAddService(internalProcessorFactory);
+    }
+
+    @Override
     public InterceptEndpointFactory getInterceptEndpointFactory() {
         if (interceptEndpointFactory == null) {
             synchronized (lock) {
@@ -4231,6 +4254,8 @@ public abstract class AbstractCamelContext extends BaseService
 
     protected abstract ProcessorFactory createProcessorFactory();
 
+    protected abstract InternalProcessorFactory createInternalProcessorFactory();
+
     protected abstract InterceptEndpointFactory createInterceptEndpointFactory();
 
     protected abstract RouteFactory createRouteFactory();
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index a36bd4d..3f1e5c6 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -18,7 +18,6 @@ package org.apache.camel.impl.engine;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -239,11 +238,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
             }
             if (!(wrapped instanceof WrapAwareProcessor)) {
                 // wrap the target so it becomes a service and we can manage its lifecycle
-                Map<String, Object> args = new HashMap<>();
-                args.put("processor", wrapped);
-                args.put("wrapped", target);
-                wrapped = camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                        .createProcessor(camelContext, "WrapProcessor", args);
+                wrapped = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                        .createWrapProcessor(wrapped, target);
             }
             target = wrapped;
         }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
index 19bb01b..ff92f1a 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
@@ -30,7 +30,6 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StatefulService;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ProducerCache;
@@ -81,18 +80,8 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
         }
 
         // internal processor used for sending
-        try {
-            sharedInternalProcessor
-                    = (SharedInternalProcessor) this.camelContext.getProcessorFactory().createProcessor(this.camelContext,
-                            "SharedCamelInternalProcessor", null);
-        } catch (Exception e) {
-            throw RuntimeCamelException.wrapRuntimeException(e);
-        }
-        if (sharedInternalProcessor == null) {
-            throw new IllegalStateException(
-                    "Cannot create SharedCamelInternalProcessor from ProcessorFactory." +
-                                            "If you have a custom ProcessorFactory then extend DefaultProcessorFactory and let the default able to create SharedCamelInternalProcessor");
-        }
+        sharedInternalProcessor
+                = this.camelContext.getInternalProcessorFactory().createSharedCamelInternalProcessor(this.camelContext);
     }
 
     protected ProducerServicePool createServicePool(CamelContext camelContext, int cacheSize) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index f2a9da4..bcc6e0f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -47,6 +47,7 @@ import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Injector;
 import org.apache.camel.spi.InterceptEndpointFactory;
+import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.LanguageResolver;
 import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.MessageHistoryFactory;
@@ -204,6 +205,15 @@ public class SimpleCamelContext extends AbstractCamelContext {
     }
 
     @Override
+    protected InternalProcessorFactory createInternalProcessorFactory() {
+        return new BaseServiceResolver<>(InternalProcessorFactory.FACTORY, InternalProcessorFactory.class)
+                .resolve(getCamelContextReference())
+                .orElseThrow(() -> new IllegalArgumentException(
+                        "Cannot find InternalProcessorFactory on classpath. "
+                                                                + "Add camel-core-processor to classpath."));
+    }
+
+    @Override
     protected InterceptEndpointFactory createInterceptEndpointFactory() {
         return new DefaultInterceptEndpointFactory();
     }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
index 8f3161e..4242740 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
@@ -18,7 +18,6 @@ package org.apache.camel.impl.engine;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,11 +59,8 @@ public final class SubscribeMethodProcessor extends AsyncProcessorSupport implem
                 .getBeanProcessorFactory().createBeanProcessor(endpoint.getCamelContext(), pojo, method);
 
         // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
-        Map<String, Object> args = new HashMap<>();
-        args.put("processor", answer);
-        args.put("route", null);
-        answer = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(endpoint.getCamelContext(), "UnitOfWorkProcessorAdvice", args);
+        answer = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .addUnitOfWorkProcessorAdvice(endpoint.getCamelContext(), answer, null);
         Predicate p;
         if (ObjectHelper.isEmpty(predicate)) {
             p = PredicateBuilder.constant(true);
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index 490fe14..e93005b 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -44,6 +44,7 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         map.put("InflightRepository", org.apache.camel.spi.InflightRepository.class);
         map.put("Injector", org.apache.camel.spi.Injector.class);
         map.put("InterceptEndpointFactory", org.apache.camel.spi.InterceptEndpointFactory.class);
+        map.put("InternalProcessorFactory", org.apache.camel.spi.InternalProcessorFactory.class);
         map.put("LanguageResolver", org.apache.camel.spi.LanguageResolver.class);
         map.put("LoadTypeConverters", java.lang.Boolean.class);
         map.put("LogExhaustedMessageBody", java.lang.Boolean.class);
@@ -149,6 +150,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "Injector": target.setInjector(property(camelContext, org.apache.camel.spi.Injector.class, value)); return true;
         case "interceptendpointfactory":
         case "InterceptEndpointFactory": target.setInterceptEndpointFactory(property(camelContext, org.apache.camel.spi.InterceptEndpointFactory.class, value)); return true;
+        case "internalprocessorfactory":
+        case "InternalProcessorFactory": target.setInternalProcessorFactory(property(camelContext, org.apache.camel.spi.InternalProcessorFactory.class, value)); return true;
         case "languageresolver":
         case "LanguageResolver": target.setLanguageResolver(property(camelContext, org.apache.camel.spi.LanguageResolver.class, value)); return true;
         case "loadtypeconverters":
@@ -306,6 +309,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "Injector": return target.getInjector();
         case "interceptendpointfactory":
         case "InterceptEndpointFactory": return target.getInterceptEndpointFactory();
+        case "internalprocessorfactory":
+        case "InternalProcessorFactory": return target.getInternalProcessorFactory();
         case "languageresolver":
         case "LanguageResolver": return target.getLanguageResolver();
         case "loadtypeconverters":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index 9cae189..e513636 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -96,6 +96,7 @@ import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Injector;
 import org.apache.camel.spi.InterceptEndpointFactory;
 import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.Language;
 import org.apache.camel.spi.LanguageResolver;
 import org.apache.camel.spi.LifecycleStrategy;
@@ -1238,6 +1239,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam
     }
 
     @Override
+    public InternalProcessorFactory getInternalProcessorFactory() {
+        return getExtendedCamelContext().getInternalProcessorFactory();
+    }
+
+    @Override
+    public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
+        getExtendedCamelContext().setInternalProcessorFactory(internalProcessorFactory);
+    }
+
+    @Override
     public InterceptEndpointFactory getInterceptEndpointFactory() {
         return getExtendedCamelContext().getInterceptEndpointFactory();
     }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
index 6c11515..a61a186 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
@@ -92,6 +92,7 @@ import org.apache.camel.spi.Injector;
 import org.apache.camel.spi.InterceptEndpointFactory;
 import org.apache.camel.spi.InterceptSendToEndpoint;
 import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.Language;
 import org.apache.camel.spi.LanguageResolver;
 import org.apache.camel.spi.LifecycleStrategy;
@@ -169,6 +170,8 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
     private final ClassLoader applicationContextClassLoader;
     private final UnitOfWorkFactory unitOfWorkFactory;
     private final RouteController routeController;
+    private final ProcessorFactory processorFactory;
+    private final InternalProcessorFactory internalProcessorFactory;
     private final InflightRepository inflightRepository;
     private final Injector injector;
     private final ClassResolver classResolver;
@@ -209,6 +212,8 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
         shutdownStrategy = context.getShutdownStrategy();
         applicationContextClassLoader = context.getApplicationContextClassLoader();
         unitOfWorkFactory = context.adapt(ExtendedCamelContext.class).getUnitOfWorkFactory();
+        processorFactory = context.adapt(ExtendedCamelContext.class).getProcessorFactory();
+        internalProcessorFactory = context.adapt(ExtendedCamelContext.class).getInternalProcessorFactory();
         routeController = context.getRouteController();
         inflightRepository = context.getInflightRepository();
         globalOptions = context.getGlobalOptions();
@@ -1365,7 +1370,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
 
     @Override
     public ProcessorFactory getProcessorFactory() {
-        throw new UnsupportedOperationException();
+        return processorFactory;
     }
 
     @Override
@@ -1374,6 +1379,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
     }
 
     @Override
+    public InternalProcessorFactory getInternalProcessorFactory() {
+        return internalProcessorFactory;
+    }
+
+    @Override
+    public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public InterceptEndpointFactory getInterceptEndpointFactory() {
         throw new UnsupportedOperationException();
     }
diff --git a/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/internal-processor-factory b/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/internal-processor-factory
new file mode 100644
index 0000000..5a0fc49
--- /dev/null
+++ b/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/internal-processor-factory
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.processor.DefaultInternalProcessorFactory
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
new file mode 100644
index 0000000..4f8acf8
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.impl.engine.CamelInternalProcessor;
+import org.apache.camel.impl.engine.DefaultChannel;
+import org.apache.camel.spi.InterceptSendToEndpoint;
+import org.apache.camel.spi.InternalProcessor;
+import org.apache.camel.spi.InternalProcessorFactory;
+import org.apache.camel.spi.SharedInternalProcessor;
+import org.apache.camel.spi.annotations.JdkService;
+
+@JdkService(InternalProcessorFactory.FACTORY)
+public class DefaultInternalProcessorFactory implements InternalProcessorFactory {
+
+    public InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, Processor processor, Route route) {
+        CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, processor);
+        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+        return internal;
+    }
+
+    public SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext) {
+        return new SharedCamelInternalProcessor(
+                camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext));
+    }
+
+    public Channel createChannel(CamelContext camelContext) {
+        return new DefaultChannel(camelContext);
+    }
+
+    public AsyncProducer createInterceptSendToEndpointProcessor(
+            InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip) {
+        return new InterceptSendToEndpointProcessor(endpoint, delegate, producer, skip);
+    }
+
+    public AsyncProcessor createWrapProcessor(Processor processor, Processor wrapped) {
+        return new WrapProcessor(processor, wrapped);
+    }
+
+    public AsyncProducer createUnitOfWorkProducer(Producer producer) {
+        return new UnitOfWorkProducer(producer);
+    }
+
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
index 595a497..6a8187b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
@@ -48,7 +48,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
     private final boolean skip;
 
     public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer,
-                                            boolean skip) throws Exception {
+                                            boolean skip) {
         super(delegate);
         this.endpoint = endpoint;
         this.delegate = delegate;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index 242fbdd..46e01e3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -16,16 +16,12 @@
  */
 package org.apache.camel.processor;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.service.ServiceHelper;
 
@@ -46,16 +42,8 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer {
         super(producer.getEndpoint());
         this.producer = producer;
         // wrap in unit of work
-
-        Map<String, Object> args = new HashMap<>();
-        args.put("processor", producer);
-        args.put("route", null);
         ExtendedCamelContext ecc = producer.getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class);
-        try {
-            this.processor = (AsyncProcessor) ecc.getProcessorFactory().createProcessor(ecc, "UnitOfWorkProcessorAdvice", args);
-        } catch (Exception e) {
-            throw RuntimeCamelException.wrapRuntimeException(e);
-        }
+        this.processor = ecc.getInternalProcessorFactory().addUnitOfWorkProcessorAdvice(ecc, producer, null);
     }
 
     @Override
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index e852aab..cef3a42 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.reifier;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -54,11 +52,8 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
         Processor childProcessor = this.createChildProcessor(true);
 
         // wrap the aggregate route in a unit of work processor
-        Map<String, Object> args = new HashMap<>();
-        args.put("processor", childProcessor);
-        args.put("route", route);
-        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+        AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
 
         Expression correlation = createExpression(definition.getExpression());
         AggregationStrategy strategy = createAggregationStrategy();
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index 32f5ce9..497bf4c 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.reifier;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncProcessor;
@@ -55,11 +53,8 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
         Processor childProcessor = this.createChildProcessor(true);
 
         // wrap the on completion route in a unit of work processor
-        Map<String, Object> args = new HashMap<>();
-        args.put("processor", childProcessor);
-        args.put("route", route);
-        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+        AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
 
         route.setOnCompletion(getId(definition), target);
 
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index bc7288b..f7a0546 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -529,8 +529,8 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
     protected Channel wrapChannel(Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler)
             throws Exception {
         // put a channel in between this and each output to control the route flow logic
-        Channel channel = (Channel) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(camelContext, "DefaultChannel", null);
+        Channel channel = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .createChannel(camelContext);
 
         // add interceptor strategies to the channel must be in this order:
         // camel context, route context, local
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
index e0caeb4..86db86a 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.reifier;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
@@ -77,11 +74,8 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         Expression expression = createExpression(definition.getExpression());
 
         // and wrap in unit of work
-        Map<String, Object> args = new HashMap<>();
-        args.put("processor", processor);
-        args.put("route", route);
-        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+        AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .addUnitOfWorkProcessorAdvice(camelContext, processor, route);
 
         ObjectHelper.notNull(config, "config", this);
         ObjectHelper.notNull(expression, "expression", this);
@@ -111,11 +105,8 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         Processor processor = this.createChildProcessor(true);
         Expression expression = createExpression(definition.getExpression());
 
-        Map<String, Object> args = new HashMap<>();
-        args.put("processor", processor);
-        args.put("route", route);
-        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+        AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .addUnitOfWorkProcessorAdvice(camelContext, processor, route);
 
         ObjectHelper.notNull(config, "config", this);
         ObjectHelper.notNull(expression, "expression", this);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
index ddb3bee..1ac63f5 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -237,11 +237,8 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
         Processor target = new Pipeline(camelContext, eventDrivenProcessors);
 
         // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
-        Map<String, Object> args = new HashMap<>();
-        args.put("processor", target);
-        args.put("route", route);
-        InternalProcessor internal = (InternalProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(camelContext, "CamelInternalProcessor", args);
+        InternalProcessor internal = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .addUnitOfWorkProcessorAdvice(camelContext, target, route);
 
         // and then optionally add route policy processor if a custom policy is set
         List<RoutePolicy> routePolicyList = route.getRoutePolicyList();
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index f0cb9fd..78517fb 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.reifier;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncProcessor;
@@ -54,11 +52,8 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
         Processor childProcessor = wrapInErrorHandler(dynamicTo, true);
 
         // and wrap in unit of work
-        Map<String, Object> args = new HashMap<>();
-        args.put("processor", childProcessor);
-        args.put("route", route);
-        AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+        AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
 
         // is true by default
         boolean isCopy = parseBoolean(definition.getCopy(), true);
diff --git a/core/camel-support/src/generated/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java b/core/camel-support/src/generated/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
index 454a0e6..74725c9 100644
--- a/core/camel-support/src/generated/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
+++ b/core/camel-support/src/generated/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.support;
 
-import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.camel.AsyncProducer;
@@ -126,14 +125,8 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint,
     @Override
     public AsyncProducer createAsyncProducer() throws Exception {
         AsyncProducer producer = delegate.createAsyncProducer();
-
-        Map<String, Object> args = new HashMap<>();
-        args.put("endpoint", this);
-        args.put("delegate", delegate);
-        args.put("producer", producer);
-        args.put("skip", skip);
-        return (AsyncProducer) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
-                .createProcessor(camelContext, "InterceptSendToEndpointProcessor", args);
+        return camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+                .createInterceptSendToEndpointProcessor(this, delegate, producer, skip);
     }
 
     @Override