You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/26 07:07:48 UTC
[camel] branch master updated: CAMEL-15753: camel-core - Modularize
and move internal processors into their own factory.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new d2f9f03 CAMEL-15753: camel-core - Modularize and move internal processors into their own factory.
d2f9f03 is described below
commit d2f9f03e0cd3de3fe5097c708866ed5d02d00bc4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 26 08:07:14 2020 +0100
CAMEL-15753: camel-core - Modularize and move internal processors into their own factory.
---
.../org/apache/camel/ExtendedCamelContext.java | 15 +++++
.../org/apache/camel/spi/InternalProcessor.java | 3 +-
.../apache/camel/spi/InternalProcessorFactory.java | 54 ++++++++++++++++++
.../camel/impl/engine/AbstractCamelContext.java | 29 +++++++++-
.../apache/camel/impl/engine/DefaultChannel.java | 8 +--
.../camel/impl/engine/DefaultProducerCache.java | 15 +----
.../camel/impl/engine/SimpleCamelContext.java | 10 ++++
.../impl/engine/SubscribeMethodProcessor.java | 8 +--
.../camel/impl/ExtendedCamelContextConfigurer.java | 5 ++
.../camel/impl/lw/LightweightCamelContext.java | 11 ++++
.../impl/lw/LightweightRuntimeCamelContext.java | 17 +++++-
.../org/apache/camel/internal-processor-factory | 2 +
.../processor/DefaultInternalProcessorFactory.java | 66 ++++++++++++++++++++++
.../InterceptSendToEndpointProcessor.java | 2 +-
.../apache/camel/processor/UnitOfWorkProducer.java | 14 +----
.../org/apache/camel/reifier/AggregateReifier.java | 9 +--
.../apache/camel/reifier/OnCompletionReifier.java | 9 +--
.../org/apache/camel/reifier/ProcessorReifier.java | 4 +-
.../apache/camel/reifier/ResequenceReifier.java | 17 ++----
.../org/apache/camel/reifier/RouteReifier.java | 7 +--
.../org/apache/camel/reifier/WireTapReifier.java | 9 +--
.../support/DefaultInterceptSendToEndpoint.java | 11 +---
22 files changed, 232 insertions(+), 93 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 6700d00..25e4817 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -40,6 +40,7 @@ import org.apache.camel.spi.FactoryFinderResolver;
import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.InterceptEndpointFactory;
import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.LogListener;
@@ -357,6 +358,20 @@ public interface ExtendedCamelContext extends CamelContext {
void setProcessorFactory(ProcessorFactory processorFactory);
/**
+ * Gets the current {@link org.apache.camel.spi.InternalProcessorFactory}
+ *
+ * @return the factory
+ */
+ InternalProcessorFactory getInternalProcessorFactory();
+
+ /**
+ * Sets a custom {@link org.apache.camel.spi.InternalProcessorFactory}
+ *
+ * @param internalProcessorFactory the custom factory
+ */
+ void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory);
+
+ /**
* Gets the current {@link org.apache.camel.spi.InterceptEndpointFactory}
*
* @return the factory
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
index f5fd933..1275ef6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
@@ -18,6 +18,7 @@ package org.apache.camel.spi;
import java.util.List;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Processor;
import org.apache.camel.Route;
@@ -36,7 +37,7 @@ import org.apache.camel.Route;
* </ul>
* ... and more.
*/
-public interface InternalProcessor extends Processor {
+public interface InternalProcessor extends AsyncProcessor {
/**
* Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
new file mode 100644
index 0000000..c619239
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+
+/**
+ * A factory used internally by Camel to create {@link Processor} and other internal building blocks. This factory is
+ * used to have loose coupling between the modules in core. Camel user user should only use {@link ProcessorFactory}.
+ *
+ * @see ProcessorFactory
+ */
+public interface InternalProcessorFactory {
+
+ /**
+ * Service factory key.
+ */
+ String FACTORY = "internal-processor-factory";
+
+ InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, Processor processor, Route route);
+
+ SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext);
+
+ Channel createChannel(CamelContext camelContext);
+
+ AsyncProducer createInterceptSendToEndpointProcessor(
+ InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip);
+
+ AsyncProcessor createWrapProcessor(Processor processor, Processor wrapped);
+
+ AsyncProducer createUnitOfWorkProducer(Producer producer);
+
+}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index dcf5a1d..88633e8 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -104,6 +104,7 @@ import org.apache.camel.spi.Injector;
import org.apache.camel.spi.InterceptEndpointFactory;
import org.apache.camel.spi.InterceptSendToEndpoint;
import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.LifecycleStrategy;
@@ -275,6 +276,7 @@ public abstract class AbstractCamelContext extends BaseService
private volatile PackageScanResourceResolver packageScanResourceResolver;
private volatile NodeIdFactory nodeIdFactory;
private volatile ProcessorFactory processorFactory;
+ private volatile InternalProcessorFactory internalProcessorFactory;
private volatile InterceptEndpointFactory interceptEndpointFactory;
private volatile RouteFactory routeFactory;
private volatile MessageHistoryFactory messageHistoryFactory;
@@ -3252,19 +3254,23 @@ public abstract class AbstractCamelContext extends BaseService
getModelToXMLDumper();
getClassResolver();
getNodeIdFactory();
- getProcessorFactory();
getModelJAXBContextFactory();
getUuidGenerator();
getUnitOfWorkFactory();
getRouteController();
try {
+ getProcessorFactory();
+ getInternalProcessorFactory();
+ } catch (IllegalArgumentException e) {
+ // ignore in case camel-core-processor is not on the classpath
+ }
+ try {
getBeanProxyFactory();
getBeanProcessorFactory();
} catch (IllegalArgumentException e) {
// ignore in case camel-bean is not on the classpath
}
getBeanPostProcessor();
- getProcessorFactory();
}
/**
@@ -3806,6 +3812,23 @@ public abstract class AbstractCamelContext extends BaseService
}
@Override
+ public InternalProcessorFactory getInternalProcessorFactory() {
+ if (internalProcessorFactory == null) {
+ synchronized (lock) {
+ if (internalProcessorFactory == null) {
+ setInternalProcessorFactory(createInternalProcessorFactory());
+ }
+ }
+ }
+ return internalProcessorFactory;
+ }
+
+ @Override
+ public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
+ this.internalProcessorFactory = doAddService(internalProcessorFactory);
+ }
+
+ @Override
public InterceptEndpointFactory getInterceptEndpointFactory() {
if (interceptEndpointFactory == null) {
synchronized (lock) {
@@ -4231,6 +4254,8 @@ public abstract class AbstractCamelContext extends BaseService
protected abstract ProcessorFactory createProcessorFactory();
+ protected abstract InternalProcessorFactory createInternalProcessorFactory();
+
protected abstract InterceptEndpointFactory createInterceptEndpointFactory();
protected abstract RouteFactory createRouteFactory();
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index a36bd4d..3f1e5c6 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -18,7 +18,6 @@ package org.apache.camel.impl.engine;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -239,11 +238,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
}
if (!(wrapped instanceof WrapAwareProcessor)) {
// wrap the target so it becomes a service and we can manage its lifecycle
- Map<String, Object> args = new HashMap<>();
- args.put("processor", wrapped);
- args.put("wrapped", target);
- wrapped = camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "WrapProcessor", args);
+ wrapped = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .createWrapProcessor(wrapped, target);
}
target = wrapped;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
index 19bb01b..ff92f1a 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
@@ -30,7 +30,6 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StatefulService;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ProducerCache;
@@ -81,18 +80,8 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
}
// internal processor used for sending
- try {
- sharedInternalProcessor
- = (SharedInternalProcessor) this.camelContext.getProcessorFactory().createProcessor(this.camelContext,
- "SharedCamelInternalProcessor", null);
- } catch (Exception e) {
- throw RuntimeCamelException.wrapRuntimeException(e);
- }
- if (sharedInternalProcessor == null) {
- throw new IllegalStateException(
- "Cannot create SharedCamelInternalProcessor from ProcessorFactory." +
- "If you have a custom ProcessorFactory then extend DefaultProcessorFactory and let the default able to create SharedCamelInternalProcessor");
- }
+ sharedInternalProcessor
+ = this.camelContext.getInternalProcessorFactory().createSharedCamelInternalProcessor(this.camelContext);
}
protected ProducerServicePool createServicePool(CamelContext camelContext, int cacheSize) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index f2a9da4..bcc6e0f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -47,6 +47,7 @@ import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.Injector;
import org.apache.camel.spi.InterceptEndpointFactory;
+import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.ManagementNameStrategy;
import org.apache.camel.spi.MessageHistoryFactory;
@@ -204,6 +205,15 @@ public class SimpleCamelContext extends AbstractCamelContext {
}
@Override
+ protected InternalProcessorFactory createInternalProcessorFactory() {
+ return new BaseServiceResolver<>(InternalProcessorFactory.FACTORY, InternalProcessorFactory.class)
+ .resolve(getCamelContextReference())
+ .orElseThrow(() -> new IllegalArgumentException(
+ "Cannot find InternalProcessorFactory on classpath. "
+ + "Add camel-core-processor to classpath."));
+ }
+
+ @Override
protected InterceptEndpointFactory createInterceptEndpointFactory() {
return new DefaultInterceptEndpointFactory();
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
index 8f3161e..4242740 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
@@ -18,7 +18,6 @@ package org.apache.camel.impl.engine;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -60,11 +59,8 @@ public final class SubscribeMethodProcessor extends AsyncProcessorSupport implem
.getBeanProcessorFactory().createBeanProcessor(endpoint.getCamelContext(), pojo, method);
// must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
- Map<String, Object> args = new HashMap<>();
- args.put("processor", answer);
- args.put("route", null);
- answer = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(endpoint.getCamelContext(), "UnitOfWorkProcessorAdvice", args);
+ answer = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .addUnitOfWorkProcessorAdvice(endpoint.getCamelContext(), answer, null);
Predicate p;
if (ObjectHelper.isEmpty(predicate)) {
p = PredicateBuilder.constant(true);
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index 490fe14..e93005b 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -44,6 +44,7 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
map.put("InflightRepository", org.apache.camel.spi.InflightRepository.class);
map.put("Injector", org.apache.camel.spi.Injector.class);
map.put("InterceptEndpointFactory", org.apache.camel.spi.InterceptEndpointFactory.class);
+ map.put("InternalProcessorFactory", org.apache.camel.spi.InternalProcessorFactory.class);
map.put("LanguageResolver", org.apache.camel.spi.LanguageResolver.class);
map.put("LoadTypeConverters", java.lang.Boolean.class);
map.put("LogExhaustedMessageBody", java.lang.Boolean.class);
@@ -149,6 +150,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
case "Injector": target.setInjector(property(camelContext, org.apache.camel.spi.Injector.class, value)); return true;
case "interceptendpointfactory":
case "InterceptEndpointFactory": target.setInterceptEndpointFactory(property(camelContext, org.apache.camel.spi.InterceptEndpointFactory.class, value)); return true;
+ case "internalprocessorfactory":
+ case "InternalProcessorFactory": target.setInternalProcessorFactory(property(camelContext, org.apache.camel.spi.InternalProcessorFactory.class, value)); return true;
case "languageresolver":
case "LanguageResolver": target.setLanguageResolver(property(camelContext, org.apache.camel.spi.LanguageResolver.class, value)); return true;
case "loadtypeconverters":
@@ -306,6 +309,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
case "Injector": return target.getInjector();
case "interceptendpointfactory":
case "InterceptEndpointFactory": return target.getInterceptEndpointFactory();
+ case "internalprocessorfactory":
+ case "InternalProcessorFactory": return target.getInternalProcessorFactory();
case "languageresolver":
case "LanguageResolver": return target.getLanguageResolver();
case "loadtypeconverters":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index 9cae189..e513636 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -96,6 +96,7 @@ import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.Injector;
import org.apache.camel.spi.InterceptEndpointFactory;
import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.LifecycleStrategy;
@@ -1238,6 +1239,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam
}
@Override
+ public InternalProcessorFactory getInternalProcessorFactory() {
+ return getExtendedCamelContext().getInternalProcessorFactory();
+ }
+
+ @Override
+ public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
+ getExtendedCamelContext().setInternalProcessorFactory(internalProcessorFactory);
+ }
+
+ @Override
public InterceptEndpointFactory getInterceptEndpointFactory() {
return getExtendedCamelContext().getInterceptEndpointFactory();
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
index 6c11515..a61a186 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
@@ -92,6 +92,7 @@ import org.apache.camel.spi.Injector;
import org.apache.camel.spi.InterceptEndpointFactory;
import org.apache.camel.spi.InterceptSendToEndpoint;
import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.LifecycleStrategy;
@@ -169,6 +170,8 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
private final ClassLoader applicationContextClassLoader;
private final UnitOfWorkFactory unitOfWorkFactory;
private final RouteController routeController;
+ private final ProcessorFactory processorFactory;
+ private final InternalProcessorFactory internalProcessorFactory;
private final InflightRepository inflightRepository;
private final Injector injector;
private final ClassResolver classResolver;
@@ -209,6 +212,8 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
shutdownStrategy = context.getShutdownStrategy();
applicationContextClassLoader = context.getApplicationContextClassLoader();
unitOfWorkFactory = context.adapt(ExtendedCamelContext.class).getUnitOfWorkFactory();
+ processorFactory = context.adapt(ExtendedCamelContext.class).getProcessorFactory();
+ internalProcessorFactory = context.adapt(ExtendedCamelContext.class).getInternalProcessorFactory();
routeController = context.getRouteController();
inflightRepository = context.getInflightRepository();
globalOptions = context.getGlobalOptions();
@@ -1365,7 +1370,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
@Override
public ProcessorFactory getProcessorFactory() {
- throw new UnsupportedOperationException();
+ return processorFactory;
}
@Override
@@ -1374,6 +1379,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
}
@Override
+ public InternalProcessorFactory getInternalProcessorFactory() {
+ return internalProcessorFactory;
+ }
+
+ @Override
+ public void setInternalProcessorFactory(InternalProcessorFactory internalProcessorFactory) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public InterceptEndpointFactory getInterceptEndpointFactory() {
throw new UnsupportedOperationException();
}
diff --git a/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/internal-processor-factory b/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/internal-processor-factory
new file mode 100644
index 0000000..5a0fc49
--- /dev/null
+++ b/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/internal-processor-factory
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.processor.DefaultInternalProcessorFactory
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
new file mode 100644
index 0000000..4f8acf8
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.impl.engine.CamelInternalProcessor;
+import org.apache.camel.impl.engine.DefaultChannel;
+import org.apache.camel.spi.InterceptSendToEndpoint;
+import org.apache.camel.spi.InternalProcessor;
+import org.apache.camel.spi.InternalProcessorFactory;
+import org.apache.camel.spi.SharedInternalProcessor;
+import org.apache.camel.spi.annotations.JdkService;
+
+@JdkService(InternalProcessorFactory.FACTORY)
+public class DefaultInternalProcessorFactory implements InternalProcessorFactory {
+
+ public InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, Processor processor, Route route) {
+ CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, processor);
+ internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+ return internal;
+ }
+
+ public SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext) {
+ return new SharedCamelInternalProcessor(
+ camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext));
+ }
+
+ public Channel createChannel(CamelContext camelContext) {
+ return new DefaultChannel(camelContext);
+ }
+
+ public AsyncProducer createInterceptSendToEndpointProcessor(
+ InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip) {
+ return new InterceptSendToEndpointProcessor(endpoint, delegate, producer, skip);
+ }
+
+ public AsyncProcessor createWrapProcessor(Processor processor, Processor wrapped) {
+ return new WrapProcessor(processor, wrapped);
+ }
+
+ public AsyncProducer createUnitOfWorkProducer(Producer producer) {
+ return new UnitOfWorkProducer(producer);
+ }
+
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
index 595a497..6a8187b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
@@ -48,7 +48,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
private final boolean skip;
public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer,
- boolean skip) throws Exception {
+ boolean skip) {
super(delegate);
this.endpoint = endpoint;
this.delegate = delegate;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index 242fbdd..46e01e3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -16,16 +16,12 @@
*/
package org.apache.camel.processor;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.service.ServiceHelper;
@@ -46,16 +42,8 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer {
super(producer.getEndpoint());
this.producer = producer;
// wrap in unit of work
-
- Map<String, Object> args = new HashMap<>();
- args.put("processor", producer);
- args.put("route", null);
ExtendedCamelContext ecc = producer.getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class);
- try {
- this.processor = (AsyncProcessor) ecc.getProcessorFactory().createProcessor(ecc, "UnitOfWorkProcessorAdvice", args);
- } catch (Exception e) {
- throw RuntimeCamelException.wrapRuntimeException(e);
- }
+ this.processor = ecc.getInternalProcessorFactory().addUnitOfWorkProcessorAdvice(ecc, producer, null);
}
@Override
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index e852aab..cef3a42 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -16,8 +16,6 @@
*/
package org.apache.camel.reifier;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -54,11 +52,8 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
Processor childProcessor = this.createChildProcessor(true);
// wrap the aggregate route in a unit of work processor
- Map<String, Object> args = new HashMap<>();
- args.put("processor", childProcessor);
- args.put("route", route);
- AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+ AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
Expression correlation = createExpression(definition.getExpression());
AggregationStrategy strategy = createAggregationStrategy();
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index 32f5ce9..497bf4c 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -16,8 +16,6 @@
*/
package org.apache.camel.reifier;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncProcessor;
@@ -55,11 +53,8 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
Processor childProcessor = this.createChildProcessor(true);
// wrap the on completion route in a unit of work processor
- Map<String, Object> args = new HashMap<>();
- args.put("processor", childProcessor);
- args.put("route", route);
- AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+ AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
route.setOnCompletion(getId(definition), target);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index bc7288b..f7a0546 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -529,8 +529,8 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
protected Channel wrapChannel(Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler)
throws Exception {
// put a channel in between this and each output to control the route flow logic
- Channel channel = (Channel) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "DefaultChannel", null);
+ Channel channel = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .createChannel(camelContext);
// add interceptor strategies to the channel must be in this order:
// camel context, route context, local
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
index e0caeb4..86db86a 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
@@ -16,9 +16,6 @@
*/
package org.apache.camel.reifier;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedCamelContext;
@@ -77,11 +74,8 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
Expression expression = createExpression(definition.getExpression());
// and wrap in unit of work
- Map<String, Object> args = new HashMap<>();
- args.put("processor", processor);
- args.put("route", route);
- AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+ AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .addUnitOfWorkProcessorAdvice(camelContext, processor, route);
ObjectHelper.notNull(config, "config", this);
ObjectHelper.notNull(expression, "expression", this);
@@ -111,11 +105,8 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
Processor processor = this.createChildProcessor(true);
Expression expression = createExpression(definition.getExpression());
- Map<String, Object> args = new HashMap<>();
- args.put("processor", processor);
- args.put("route", route);
- AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+ AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .addUnitOfWorkProcessorAdvice(camelContext, processor, route);
ObjectHelper.notNull(config, "config", this);
ObjectHelper.notNull(expression, "expression", this);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
index ddb3bee..1ac63f5 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -237,11 +237,8 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
Processor target = new Pipeline(camelContext, eventDrivenProcessors);
// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
- Map<String, Object> args = new HashMap<>();
- args.put("processor", target);
- args.put("route", route);
- InternalProcessor internal = (InternalProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "CamelInternalProcessor", args);
+ InternalProcessor internal = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .addUnitOfWorkProcessorAdvice(camelContext, target, route);
// and then optionally add route policy processor if a custom policy is set
List<RoutePolicy> routePolicyList = route.getRoutePolicyList();
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index f0cb9fd..78517fb 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -16,8 +16,6 @@
*/
package org.apache.camel.reifier;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncProcessor;
@@ -54,11 +52,8 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
Processor childProcessor = wrapInErrorHandler(dynamicTo, true);
// and wrap in unit of work
- Map<String, Object> args = new HashMap<>();
- args.put("processor", childProcessor);
- args.put("route", route);
- AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
+ AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route);
// is true by default
boolean isCopy = parseBoolean(definition.getCopy(), true);
diff --git a/core/camel-support/src/generated/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java b/core/camel-support/src/generated/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
index 454a0e6..74725c9 100644
--- a/core/camel-support/src/generated/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
+++ b/core/camel-support/src/generated/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.support;
-import java.util.HashMap;
import java.util.Map;
import org.apache.camel.AsyncProducer;
@@ -126,14 +125,8 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint,
@Override
public AsyncProducer createAsyncProducer() throws Exception {
AsyncProducer producer = delegate.createAsyncProducer();
-
- Map<String, Object> args = new HashMap<>();
- args.put("endpoint", this);
- args.put("delegate", delegate);
- args.put("producer", producer);
- args.put("skip", skip);
- return (AsyncProducer) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
- .createProcessor(camelContext, "InterceptSendToEndpointProcessor", args);
+ return camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
+ .createInterceptSendToEndpointProcessor(this, delegate, producer, skip);
}
@Override