You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/04/02 06:26:31 UTC
[camel] 03/04: CAMEL-15105: make the InternalProcessorFactory a plugin of the context
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 525b70ef436fb8adf113d82ad79e1c3527570c17
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat Apr 1 14:31:47 2023 +0200
CAMEL-15105: make the InternalProcessorFactory a plugin of the context
---
.../java/org/apache/camel/ExtendedCamelContext.java | 15 ---------------
.../apache/camel/impl/engine/AbstractCamelContext.java | 2 +-
.../impl/engine/DefaultCamelContextExtension.java | 18 ------------------
.../org/apache/camel/impl/engine/DefaultChannel.java | 3 ++-
.../camel/impl/engine/SubscribeMethodProcessor.java | 3 ++-
.../camel/impl/ExtendedCamelContextConfigurer.java | 6 ------
.../impl/lw/LightweightCamelContextExtension.java | 11 -----------
.../org/apache/camel/processor/MulticastProcessor.java | 3 ++-
.../org/apache/camel/processor/UnitOfWorkProducer.java | 3 ++-
.../org/apache/camel/reifier/AggregateReifier.java | 3 ++-
.../org/apache/camel/reifier/OnCompletionReifier.java | 3 ++-
.../org/apache/camel/reifier/ProcessorReifier.java | 2 +-
.../org/apache/camel/reifier/ResequenceReifier.java | 5 +++--
.../java/org/apache/camel/reifier/RouteReifier.java | 3 ++-
.../java/org/apache/camel/reifier/WireTapReifier.java | 3 ++-
.../reifier/errorhandler/ErrorHandlerReifier.java | 3 ++-
.../camel/support/DefaultInterceptSendToEndpoint.java | 2 +-
.../java/org/apache/camel/support/PluginHelper.java | 18 ++++++++++++++++++
.../camel/support/cache/DefaultProducerCache.java | 5 +++--
19 files changed, 45 insertions(+), 66 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 50f9a58a498..c1850cfb5c7 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -37,7 +37,6 @@ import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.InterceptEndpointFactory;
import org.apache.camel.spi.InterceptStrategy;
-import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.ManagementMBeanAssembler;
@@ -316,20 +315,6 @@ public interface ExtendedCamelContext {
*/
FactoryFinder getFactoryFinder(String path);
- /**
- * Gets the current {@link org.apache.camel.spi.InternalProcessorFactory}
- *
- * @return the factory
- */
- InternalProcessorFactory getInternalProcessorFactory();
-
- /**
- * Sets a custom {@link org.apache.camel.spi.InternalProcessorFactory}
- *
- * @param internalProcessorFactory the custom factory
- */
- void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory);
-
/**
* Gets the current {@link org.apache.camel.spi.InterceptEndpointFactory}
*
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 47609926f46..2e288f9763c 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -221,7 +221,6 @@ public abstract class AbstractCamelContext extends BaseService
volatile ModelToXMLDumper modelToXMLDumper;
volatile RestBindingJaxbDataFormatFactory restBindingJaxbDataFormatFactory;
volatile RuntimeCamelCatalog runtimeCamelCatalog;
- volatile InternalProcessorFactory internalProcessorFactory;
volatile InterceptEndpointFactory interceptEndpointFactory;
volatile RouteFactory routeFactory;
volatile AsyncProcessorAwaitManager asyncProcessorAwaitManager;
@@ -380,6 +379,7 @@ public abstract class AbstractCamelContext extends BaseService
camelContextExtension.lazyAddContextPlugin(HealthCheckResolver.class, this::createHealthCheckResolver);
camelContextExtension.lazyAddContextPlugin(DevConsoleResolver.class, this::createDevConsoleResolver);
camelContextExtension.lazyAddContextPlugin(ProcessorFactory.class, this::createProcessorFactory);
+ camelContextExtension.lazyAddContextPlugin(InternalProcessorFactory.class, this::createInternalProcessorFactory);
if (build) {
try {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
index 7e5f6e45a66..482ef419e07 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
@@ -54,7 +54,6 @@ import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.InterceptEndpointFactory;
import org.apache.camel.spi.InterceptStrategy;
-import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.ManagementMBeanAssembler;
@@ -466,23 +465,6 @@ class DefaultCamelContextExtension implements ExtendedCamelContext {
this.lightweight = lightweight;
}
- @Override
- public InternalProcessorFactory getInternalProcessorFactory() {
- if (camelContext.internalProcessorFactory == null) {
- synchronized (camelContext.lock) {
- if (camelContext.internalProcessorFactory == null) {
- setInternalProcessorFactory(camelContext.createInternalProcessorFactory());
- }
- }
- }
- return camelContext.internalProcessorFactory;
- }
-
- @Override
- public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
- camelContext.internalProcessorFactory = camelContext.getInternalServiceManager().addService(internalProcessorFactory);
- }
-
@Override
public InterceptEndpointFactory getInterceptEndpointFactory() {
if (camelContext.interceptEndpointFactory == null) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index f0ab9329db5..50cc9201a28 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -40,6 +40,7 @@ import org.apache.camel.spi.MessageHistoryFactory;
import org.apache.camel.spi.Tracer;
import org.apache.camel.spi.WrapAwareProcessor;
import org.apache.camel.support.OrderedComparator;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -223,7 +224,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
}
if (!(wrapped instanceof WrapAwareProcessor)) {
// wrap the target so it becomes a service and we can manage its lifecycle
- wrapped = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ wrapped = PluginHelper.getInternalProcessorFactory(camelContext)
.createWrapProcessor(wrapped, target);
}
target = wrapped;
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
index 813ccdc85b5..48dcbf336c9 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
@@ -31,6 +31,7 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.spi.Language;
import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.builder.PredicateBuilder;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
@@ -58,7 +59,7 @@ public final class SubscribeMethodProcessor extends AsyncProcessorSupport implem
.getBeanProcessorFactory().createBeanProcessor(endpoint.getCamelContext(), pojo, method);
// must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
- answer = endpoint.getCamelContext().getCamelContextExtension().getInternalProcessorFactory()
+ answer = PluginHelper.getInternalProcessorFactory(endpoint.getCamelContext())
.addUnitOfWorkProcessorAdvice(endpoint.getCamelContext(), answer, null);
Predicate p;
if (ObjectHelper.isEmpty(predicate)) {
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index 244fd82d8c2..d4a7548ee8a 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -47,8 +47,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
case "HeadersMapFactory": target.setHeadersMapFactory(property(camelContext, org.apache.camel.spi.HeadersMapFactory.class, value)); return true;
case "interceptendpointfactory":
case "InterceptEndpointFactory": target.setInterceptEndpointFactory(property(camelContext, org.apache.camel.spi.InterceptEndpointFactory.class, value)); return true;
- case "internalprocessorfactory":
- case "InternalProcessorFactory": target.setInternalProcessorFactory(property(camelContext, org.apache.camel.spi.InternalProcessorFactory.class, value)); return true;
case "lightweight":
case "Lightweight": target.setLightweight(property(camelContext, boolean.class, value)); return true;
case "modeltoxmldumper":
@@ -108,8 +106,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
case "HeadersMapFactory": return org.apache.camel.spi.HeadersMapFactory.class;
case "interceptendpointfactory":
case "InterceptEndpointFactory": return org.apache.camel.spi.InterceptEndpointFactory.class;
- case "internalprocessorfactory":
- case "InternalProcessorFactory": return org.apache.camel.spi.InternalProcessorFactory.class;
case "lightweight":
case "Lightweight": return boolean.class;
case "modeltoxmldumper":
@@ -170,8 +166,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
case "HeadersMapFactory": return target.getHeadersMapFactory();
case "interceptendpointfactory":
case "InterceptEndpointFactory": return target.getInterceptEndpointFactory();
- case "internalprocessorfactory":
- case "InternalProcessorFactory": return target.getInternalProcessorFactory();
case "lightweight":
case "Lightweight": return target.isLightweight();
case "modeltoxmldumper":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
index e280a82d6dc..14d389a7e31 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
@@ -52,7 +52,6 @@ import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.InterceptEndpointFactory;
import org.apache.camel.spi.InterceptStrategy;
-import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.ManagementMBeanAssembler;
import org.apache.camel.spi.ModelToXMLDumper;
@@ -242,16 +241,6 @@ class LightweightCamelContextExtension implements ExtendedCamelContext {
throw new UnsupportedOperationException();
}
- @Override
- public InternalProcessorFactory getInternalProcessorFactory() {
- return camelContext.getCamelContextExtension().getInternalProcessorFactory();
- }
-
- @Override
- public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
- throw new UnsupportedOperationException();
- }
-
@Override
public InterceptEndpointFactory getInterceptEndpointFactory() {
throw new UnsupportedOperationException();
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index efdb733fb50..c0a7fd9df6b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -66,6 +66,7 @@ import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.IOHelper;
@@ -186,7 +187,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
boolean parallelAggregate) {
notNull(camelContext, "camelContext");
this.camelContext = camelContext;
- this.internalProcessorFactory = camelContext.getCamelContextExtension().getInternalProcessorFactory();
+ this.internalProcessorFactory = PluginHelper.getInternalProcessorFactory(camelContext);
this.route = route;
this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor();
this.processors = processors;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index b47529114a0..f3e893a7cca 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -23,6 +23,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
/**
@@ -43,7 +44,7 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer {
this.producer = producer;
// wrap in unit of work
CamelContext ecc = producer.getEndpoint().getCamelContext();
- this.processor = ecc.getCamelContextExtension().getInternalProcessorFactory()
+ this.processor = PluginHelper.getInternalProcessorFactory(ecc)
.addUnitOfWorkProcessorAdvice(ecc, producer, null);
}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 6f1f3117d08..228157e413b 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -33,6 +33,7 @@ import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.support.PluginHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +54,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
Processor childProcessor = this.createChildProcessor(true);
// wrap the aggregate route in a unit of work processor
- AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
Expression correlation = createExpression(definition.getExpression());
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index b3e93f58c65..96753213850 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -26,6 +26,7 @@ import org.apache.camel.model.OnCompletionDefinition;
import org.apache.camel.model.OnCompletionMode;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.OnCompletionProcessor;
+import org.apache.camel.support.PluginHelper;
public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition> {
@@ -52,7 +53,7 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
Processor childProcessor = this.createChildProcessor(true);
// wrap the on completion route in a unit of work processor
- AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
route.setOnCompletion(getId(definition), target);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 41096b48c89..7d6209606c9 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -630,7 +630,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
protected Channel wrapChannel(Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler)
throws Exception {
// put a channel in between this and each output to control the route flow logic
- Channel channel = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ Channel channel = PluginHelper.getInternalProcessorFactory(camelContext)
.createChannel(camelContext);
// add interceptor strategies to the channel must be in this order:
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
index 5a2d46068e5..bc85e2c9cf8 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
@@ -29,6 +29,7 @@ import org.apache.camel.processor.Resequencer;
import org.apache.camel.processor.StreamResequencer;
import org.apache.camel.processor.resequencer.DefaultExchangeComparator;
import org.apache.camel.processor.resequencer.ExpressionResultComparator;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.util.ObjectHelper;
public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
@@ -73,7 +74,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
Expression expression = createExpression(definition.getExpression());
// and wrap in unit of work
- AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, processor, route);
ObjectHelper.notNull(config, "config", this);
@@ -110,7 +111,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
Processor processor = this.createChildProcessor(true);
Expression expression = createExpression(definition.getExpression());
- AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, processor, route);
ObjectHelper.notNull(config, "config", this);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
index e72df220d57..45a0f6584c1 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -49,6 +49,7 @@ import org.apache.camel.spi.ManagementInterceptStrategy;
import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.support.PluginHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -232,7 +233,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
target.setRouteId(id);
// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
- InternalProcessor internal = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ InternalProcessor internal = PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, target, route);
// configure route policy
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 34096a72f64..d8562141286 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -35,6 +35,7 @@ import org.apache.camel.processor.WireTapProcessor;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.LanguageSupport;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.util.StringHelper;
public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
@@ -86,7 +87,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
Processor childProcessor = wrapInErrorHandler(producer);
// and wrap in unit of work
- AsyncProcessor target = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
// is true by default
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
index 985d2665cab..ec6b3d80297 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
@@ -44,6 +44,7 @@ import org.apache.camel.reifier.AbstractReifier;
import org.apache.camel.spi.ErrorHandler;
import org.apache.camel.spi.Language;
import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.util.ObjectHelper;
public abstract class ErrorHandlerReifier<T extends ErrorHandlerFactory> extends AbstractReifier {
@@ -477,7 +478,7 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerFactory> extends
}
if (processor != null) {
// must wrap the processor in an UoW
- processor = camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ processor = PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, processor, route);
}
return processor;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
index 669b066e05a..b35c1ef0893 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
@@ -134,7 +134,7 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint,
@Override
public AsyncProducer createAsyncProducer() throws Exception {
AsyncProducer producer = delegate.createAsyncProducer();
- return camelContext.getCamelContextExtension().getInternalProcessorFactory()
+ return PluginHelper.getInternalProcessorFactory(camelContext)
.createInterceptSendToEndpointProcessor(this, delegate, producer, skip);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
index 30ddec8e04e..5396ed47732 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
@@ -30,6 +30,7 @@ import org.apache.camel.spi.ComponentResolver;
import org.apache.camel.spi.ConfigurerResolver;
import org.apache.camel.spi.DataFormatResolver;
import org.apache.camel.spi.FactoryFinderResolver;
+import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.ModelJAXBContextFactory;
import org.apache.camel.spi.ModelineFactory;
@@ -365,4 +366,21 @@ public final class PluginHelper {
return extendedCamelContext.getContextPlugin(ProcessorFactory.class);
}
+ /**
+ * Gets the current {@link org.apache.camel.spi.InternalProcessorFactory}
+ *
+ * @return the factory
+ */
+ public static InternalProcessorFactory getInternalProcessorFactory(CamelContext camelContext) {
+ return getInternalProcessorFactory(camelContext.getCamelContextExtension());
+ }
+
+ /**
+ * Gets the current {@link org.apache.camel.spi.InternalProcessorFactory}
+ *
+ * @return the factory
+ */
+ public static InternalProcessorFactory getInternalProcessorFactory(ExtendedCamelContext extendedCamelContext) {
+ return extendedCamelContext.getContextPlugin(InternalProcessorFactory.class);
+ }
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
index 17612316e62..16d0a986aa7 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
@@ -36,6 +36,7 @@ import org.apache.camel.spi.SharedInternalProcessor;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.DefaultEndpointUtilizationStatistics;
import org.apache.camel.support.EventHelper;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.StopWatch;
@@ -79,8 +80,8 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
// internal processor used for sending
sharedInternalProcessor
- = this.camelContext.getCamelContextExtension()
- .getInternalProcessorFactory().createSharedCamelInternalProcessor(camelContext);
+ = PluginHelper.getInternalProcessorFactory(this.camelContext)
+ .createSharedCamelInternalProcessor(camelContext);
}
protected ProducerServicePool createServicePool(CamelContext camelContext, int cacheSize) {