You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/25 15:43:55 UTC
[camel] 01/02: CAMEL-15753: camel-core - Modularize reifier should
not use base
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit c531f0e580c64f0729b48c70c77180d08d81c1f2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Oct 25 15:19:23 2020 +0100
CAMEL-15753: camel-core - Modularize reifier should not use base
---
.../src/main/java/org/apache/camel/Channel.java | 35 +++++++++++++++-
.../spi/ErrorHandlerRedeliveryCustomizer.java | 48 +++++++++++++++++++++
.../org/apache/camel/spi/InternalProcessor.java | 39 ++++++++++-------
...Processor.java => SharedInternalProcessor.java} | 5 ++-
.../org/apache/camel/spi/WrapAwareProcessor.java | 31 ++++++++++++++
.../camel/impl/engine/CamelInternalProcessor.java | 48 +++++++++++++++------
.../apache/camel/impl/engine}/DefaultChannel.java | 49 +++++++++-------------
.../camel/impl/engine/DefaultProducerCache.java | 15 +++----
.../impl/cloud/ServiceCallConfigurationTest.java | 2 +-
.../camel/impl/lw/LightweightCamelContext.java | 2 +-
.../camel/processor/DefaultProcessorFactory.java | 17 ++++++++
.../apache/camel/processor/MulticastProcessor.java | 1 +
.../processor/SharedCamelInternalProcessor.java | 4 +-
.../apache/camel/processor/UnitOfWorkProducer.java | 20 ++++++---
.../org/apache/camel/processor/WrapProcessor.java | 11 +++--
.../errorhandler/RedeliveryErrorHandler.java | 20 ++-------
core/camel-core-reifier/pom.xml | 17 +++++---
.../org/apache/camel/reifier/AggregateReifier.java | 15 ++++---
.../apache/camel/reifier/OnCompletionReifier.java | 17 +++++---
.../org/apache/camel/reifier/ProcessorReifier.java | 9 ++--
.../apache/camel/reifier/ResequenceReifier.java | 25 +++++++----
.../org/apache/camel/reifier/RouteReifier.java | 31 ++++++--------
.../org/apache/camel/reifier/WireTapReifier.java | 17 +++++---
.../RandomLoadBalanceJavaDSLBuilderTest.java | 2 +-
.../apache/camel/processor/ResequencerTest.java | 2 +-
25 files changed, 330 insertions(+), 152 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/Channel.java b/core/camel-api/src/main/java/org/apache/camel/Channel.java
index 6a5911e..e2777f8 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Channel.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Channel.java
@@ -16,6 +16,11 @@
*/
package org.apache.camel;
+import java.util.List;
+
+import org.apache.camel.spi.ErrorHandler;
+import org.apache.camel.spi.InterceptStrategy;
+
/**
* Channel acts as a channel between {@link Processor}s in the route graph.
* <p/>
@@ -24,13 +29,41 @@ package org.apache.camel;
public interface Channel extends AsyncProcessor, Navigate<Processor> {
/**
- * Gets the {@link org.apache.camel.processor.ErrorHandler} this Channel uses.
+ * Initializes the channel. If the initialized output definition contained outputs (children) then the
+ * childDefinition will be set so we can leverage fine grained tracing
+ */
+ void initChannel(
+ Route route,
+ NamedNode definition,
+ NamedNode childDefinition,
+ List<InterceptStrategy> interceptors,
+ Processor nextProcessor,
+ NamedRoute routeDefinition,
+ boolean first)
+ throws Exception;
+
+ /**
+ * Post initializes the channel.
+ *
+ * @throws Exception is thrown if some error occurred
+ */
+ void postInitChannel() throws Exception;
+
+ /**
+ * Gets the {@link ErrorHandler} this Channel uses.
*
* @return the error handler, or <tt>null</tt> if no error handler is used.
*/
Processor getErrorHandler();
/**
+ * Sets the {@link ErrorHandler} this Channel uses.
+ *
+ * @param errorHandler the error handler
+ */
+ void setErrorHandler(Processor errorHandler);
+
+ /**
* Gets the wrapped output that at runtime should be delegated to.
*
* @return the output to route the {@link Exchange} to
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ErrorHandlerRedeliveryCustomizer.java b/core/camel-api/src/main/java/org/apache/camel/spi/ErrorHandlerRedeliveryCustomizer.java
new file mode 100644
index 0000000..81eb10d
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ErrorHandlerRedeliveryCustomizer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Processor;
+
+/**
+ * Customizer for {@link ErrorHandler} which supports redeliveries. This is used internally by Camel to instrument the
+ * error handler with additional instrumentations during route initialization.
+ */
+public interface ErrorHandlerRedeliveryCustomizer {
+
+ /**
+ * Determines if redelivery is enabled by checking if any of the redelivery policy settings may allow redeliveries.
+ *
+ * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
+ * @throws Exception can be thrown
+ */
+ boolean determineIfRedeliveryIsEnabled() throws Exception;
+
+ /**
+ * Returns the output processor
+ */
+ Processor getOutput();
+
+ /**
+ * Allows to change the output of the error handler which are used when optimising the JMX instrumentation to use
+ * either an advice or wrapped processor when calling a processor. The former is faster and therefore preferred,
+ * however if the error handler supports redelivery we need fine grained instrumentation which then must be wrapped
+ * and therefore need to change the output on the error handler.
+ */
+ void changeOutput(Processor output);
+
+}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
index 5b81f74..f5fd933 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
@@ -16,13 +16,13 @@
*/
package org.apache.camel.spi;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
+import java.util.List;
+
import org.apache.camel.Processor;
+import org.apache.camel.Route;
/**
- * An internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
+ * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
* <ul>
* <li>Execute {@link UnitOfWork}</li>
* <li>Keeping track which route currently is being routed</li>
@@ -35,25 +35,32 @@ import org.apache.camel.Processor;
* <li>{@link Transformer}</li>
* </ul>
* ... and more.
- * <p/>
- *
- * This is intended for internal use only - do not use this.
*/
public interface InternalProcessor extends Processor {
- @Override
- default void process(Exchange exchange) throws Exception {
- // not in use
- }
-
/**
- * Asynchronous API
+ * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
+ *
+ * @param advice the advice to add
*/
- boolean process(Exchange exchange, AsyncCallback originalCallback, AsyncProcessor processor, Processor resultProcessor);
+ void addAdvice(CamelInternalProcessorAdvice<?> advice);
/**
- * Synchronous API
+ * Gets the advice with the given type.
+ *
+ * @param type the type of the advice
+ * @return the advice if exists, or <tt>null</tt> if no advices has been added with the given type.
*/
- void process(Exchange exchange, AsyncProcessor processor, Processor resultProcessor);
+ <T> T getAdvice(Class<T> type);
+
+ void addRoutePolicyAdvice(List<RoutePolicy> routePolicyList);
+
+ void addRouteInflightRepositoryAdvice(InflightRepository inflightRepository, String routeId);
+
+ void addRouteLifecycleAdvice();
+
+ void addManagementInterceptStrategy(ManagementInterceptStrategy.InstrumentationProcessor processor);
+
+ void setRouteOnAdvices(Route route);
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/SharedInternalProcessor.java
similarity index 89%
copy from core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
copy to core/camel-api/src/main/java/org/apache/camel/spi/SharedInternalProcessor.java
index 5b81f74..29c4159 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/SharedInternalProcessor.java
@@ -22,7 +22,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
/**
- * An internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
+ * A Shared (thread safe) internal {@link Processor} that Camel routing engine used during routing for cross cutting
+ * functionality such as:
* <ul>
* <li>Execute {@link UnitOfWork}</li>
* <li>Keeping track which route currently is being routed</li>
@@ -39,7 +40,7 @@ import org.apache.camel.Processor;
*
* This is intended for internal use only - do not use this.
*/
-public interface InternalProcessor extends Processor {
+public interface SharedInternalProcessor extends Processor {
@Override
default void process(Exchange exchange) throws Exception {
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/WrapAwareProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/WrapAwareProcessor.java
new file mode 100644
index 0000000..3b346c5
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/WrapAwareProcessor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Processor;
+
+/**
+ * An interface to represent an object which wraps a {@link Processor}.
+ */
+public interface WrapAwareProcessor {
+
+ /**
+ * Gets the wrapped {@link Processor}
+ */
+ Processor getWrapped();
+
+}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 8fbb08f..fd00655 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -42,6 +42,7 @@ import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage;
import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.spi.Debugger;
import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.spi.InternalProcessor;
import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
import org.apache.camel.spi.MessageHistoryFactory;
import org.apache.camel.spi.ReactiveExecutor;
@@ -91,7 +92,7 @@ import org.slf4j.LoggerFactory;
* <p/>
* The added advices can implement {@link Ordered} to control in which order the advices are executed.
*/
-public class CamelInternalProcessor extends DelegateAsyncProcessor {
+public class CamelInternalProcessor extends DelegateAsyncProcessor implements InternalProcessor {
private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class);
@@ -116,11 +117,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
this.shutdownStrategy = camelContext.getShutdownStrategy();
}
- /**
- * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
- *
- * @param advice the advice to add
- */
+ @Override
public void addAdvice(CamelInternalProcessorAdvice<?> advice) {
advices.add(advice);
// ensure advices are sorted so they are in the order we want
@@ -131,12 +128,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
}
}
- /**
- * Gets the advice with the given type.
- *
- * @param type the type of the advice
- * @return the advice if exists, or <tt>null</tt> if no advices has been added with the given type.
- */
+ @Override
public <T> T getAdvice(Class<T> type) {
for (CamelInternalProcessorAdvice task : advices) {
Object advice = unwrap(task);
@@ -147,6 +139,38 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
return null;
}
+ @Override
+ public void addRoutePolicyAdvice(List<RoutePolicy> routePolicyList) {
+ addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList));
+ }
+
+ @Override
+ public void addRouteInflightRepositoryAdvice(InflightRepository inflightRepository, String routeId) {
+ addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(camelContext.getInflightRepository(), routeId));
+ }
+
+ @Override
+ public void addRouteLifecycleAdvice() {
+ addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
+ }
+
+ @Override
+ public void addManagementInterceptStrategy(InstrumentationProcessor processor) {
+ addAdvice(CamelInternalProcessor.wrap(processor));
+ }
+
+ @Override
+ public void setRouteOnAdvices(Route route) {
+ RoutePolicyAdvice task = getAdvice(RoutePolicyAdvice.class);
+ if (task != null) {
+ task.setRoute(route);
+ }
+ RouteLifecycleAdvice task2 = getAdvice(RouteLifecycleAdvice.class);
+ if (task2 != null) {
+ task2.setRoute(route);
+ }
+ }
+
/**
* Callback task to process the advices after processing.
*/
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
similarity index 89%
rename from core/camel-core-processor/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
rename to core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index 8692457..a36bd4d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -14,10 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.processor.channel;
+package org.apache.camel.impl.engine;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -26,21 +27,20 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Channel;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NamedNode;
import org.apache.camel.NamedRoute;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.impl.debugger.BacklogDebugger;
import org.apache.camel.impl.debugger.BacklogTracer;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
-import org.apache.camel.processor.WrapProcessor;
-import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler;
import org.apache.camel.spi.Debugger;
-import org.apache.camel.spi.ErrorHandler;
+import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.ManagementInterceptStrategy;
import org.apache.camel.spi.MessageHistoryFactory;
import org.apache.camel.spi.Tracer;
+import org.apache.camel.spi.WrapAwareProcessor;
import org.apache.camel.support.OrderedComparator;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
@@ -105,11 +105,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
return nextProcessor;
}
- /**
- * Sets the {@link ErrorHandler} this Channel uses.
- *
- * @param errorHandler the error handler
- */
+ @Override
public void setErrorHandler(Processor errorHandler) {
this.errorHandler = errorHandler;
}
@@ -157,15 +153,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
ServiceHelper.stopAndShutdownServices(output, errorHandler);
}
- /**
- * Initializes the channel. If the initialized output definition contained outputs (children) then the
- * childDefinition will be set so we can leverage fine grained tracing
- *
- * @param route the route context
- * @param definition the route definition the {@link Channel} represents
- * @param childDefinition the child definition
- * @throws Exception is thrown if some error occurred
- */
+ @Override
public void initChannel(
Route route,
NamedNode definition,
@@ -249,9 +237,13 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
+ " but its not the most optimal solution. Please consider changing your interceptor to comply.",
strategy, definition);
}
- if (!(wrapped instanceof WrapProcessor)) {
+ if (!(wrapped instanceof WrapAwareProcessor)) {
// wrap the target so it becomes a service and we can manage its lifecycle
- wrapped = new WrapProcessor(wrapped, target);
+ Map<String, Object> args = new HashMap<>();
+ args.put("processor", wrapped);
+ args.put("wrapped", target);
+ wrapped = camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+ .createProcessor(camelContext, "WrapProcessor", args);
}
target = wrapped;
}
@@ -268,24 +260,21 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
output = target;
}
- /**
- * Post initializes the channel.
- *
- * @throws Exception is thrown if some error occurred
- */
+ @Override
public void postInitChannel() throws Exception {
// if jmx was enabled for the processor then either add as advice or wrap and change the processor
// on the error handler. See more details in the class javadoc of InstrumentationProcessor
if (instrumentationProcessor != null) {
boolean redeliveryPossible = false;
- if (errorHandler instanceof RedeliveryErrorHandler) {
- redeliveryPossible = ((RedeliveryErrorHandler) errorHandler).determineIfRedeliveryIsEnabled();
+ if (errorHandler instanceof ErrorHandlerRedeliveryCustomizer) {
+ ErrorHandlerRedeliveryCustomizer erh = (ErrorHandlerRedeliveryCustomizer) errorHandler;
+ redeliveryPossible = erh.determineIfRedeliveryIsEnabled();
if (redeliveryPossible) {
// okay we can redeliver then we need to change the output in the error handler
// to use us which we then wrap the call so we can capture before/after for redeliveries as well
- Processor currentOutput = ((RedeliveryErrorHandler) errorHandler).getOutput();
+ Processor currentOutput = erh.getOutput();
instrumentationProcessor.setProcessor(currentOutput);
- ((RedeliveryErrorHandler) errorHandler).changeOutput(instrumentationProcessor);
+ erh.changeOutput(instrumentationProcessor);
}
}
if (!redeliveryPossible) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
index aad974a..19bb01b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
@@ -33,8 +33,8 @@ import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StatefulService;
import org.apache.camel.spi.EndpointUtilizationStatistics;
-import org.apache.camel.spi.InternalProcessor;
import org.apache.camel.spi.ProducerCache;
+import org.apache.camel.spi.SharedInternalProcessor;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.service.ServiceHelper;
@@ -54,7 +54,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
private final ExtendedCamelContext camelContext;
private final ProducerServicePool producers;
private final Object source;
- private final InternalProcessor internalProcessor;
+ private final SharedInternalProcessor sharedInternalProcessor;
private EndpointUtilizationStatistics statistics;
private boolean eventNotifierEnabled = true;
@@ -82,12 +82,13 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
// internal processor used for sending
try {
- internalProcessor = (InternalProcessor) this.camelContext.getProcessorFactory().createProcessor(this.camelContext,
- "SharedCamelInternalProcessor", null);
+ sharedInternalProcessor
+ = (SharedInternalProcessor) this.camelContext.getProcessorFactory().createProcessor(this.camelContext,
+ "SharedCamelInternalProcessor", null);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeException(e);
}
- if (internalProcessor == null) {
+ if (sharedInternalProcessor == null) {
throw new IllegalStateException(
"Cannot create SharedCamelInternalProcessor from ProcessorFactory." +
"If you have a custom ProcessorFactory then extend DefaultProcessorFactory and let the default able to create SharedCamelInternalProcessor");
@@ -195,7 +196,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
}
// invoke the synchronous method
- internalProcessor.process(exchange, producer, resultProcessor);
+ sharedInternalProcessor.process(exchange, producer, resultProcessor);
} catch (Throwable e) {
// ensure exceptions is caught and set on the exchange
@@ -362,7 +363,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
callback = new EventNotifierCallback(callback, exchange, endpoint);
}
// invoke the asynchronous method
- return internalProcessor.process(exchange, callback, producer, resultProcessor);
+ return sharedInternalProcessor.process(exchange, callback, producer, resultProcessor);
} catch (Throwable e) {
// ensure exceptions is caught and set on the exchange
exchange.setException(e);
diff --git a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
index 53bed2e..7fac92a 100644
--- a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
+++ b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
@@ -30,7 +30,7 @@ import org.apache.camel.model.cloud.ServiceCallConfigurationDefinition;
import org.apache.camel.model.cloud.ServiceCallDefinitionConstants;
import org.apache.camel.model.cloud.ServiceCallExpressionConfiguration;
import org.apache.camel.model.language.SimpleExpression;
-import org.apache.camel.processor.channel.DefaultChannel;
+import org.apache.camel.impl.engine.DefaultChannel;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index cb01e8c..9cae189 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -52,6 +52,7 @@ import org.apache.camel.ValueHolder;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.engine.DefaultChannel;
import org.apache.camel.impl.engine.DefaultRoute;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.FaultToleranceConfigurationDefinition;
@@ -67,7 +68,6 @@ import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.model.rest.RestDefinition;
import org.apache.camel.model.transformer.TransformerDefinition;
import org.apache.camel.model.validator.ValidatorDefinition;
-import org.apache.camel.processor.channel.DefaultChannel;
import org.apache.camel.spi.AnnotationBasedProcessorFactory;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.BeanIntrospection;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
index cb4b71e..475a7c3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
@@ -32,6 +32,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Route;
import org.apache.camel.impl.engine.CamelInternalProcessor;
+import org.apache.camel.impl.engine.DefaultChannel;
import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.InterceptSendToEndpoint;
import org.apache.camel.spi.ProcessorFactory;
@@ -87,6 +88,10 @@ public class DefaultProcessorFactory implements ProcessorFactory {
return null;
}
+ // TODO: Make an InternalProcessorFactory that are not for end users
+ // TODO: Add API with suitable method names on InternalProcessorFactory instead of this generic with Map args
+ // TODO: For map args then use Object[] as its faster
+
@Override
@SuppressWarnings("unchecked")
public Processor createProcessor(CamelContext camelContext, String definitionName, Map<String, Object> args)
@@ -120,6 +125,10 @@ public class DefaultProcessorFactory implements ProcessorFactory {
} else if ("UnitOfWorkProducer".equals(definitionName)) {
Producer producer = (Producer) args.get("producer");
return new UnitOfWorkProducer(producer);
+ } else if ("WrapProcessor".equals(definitionName)) {
+ Processor processor = (Processor) args.get("processor");
+ Processor wrapped = (Processor) args.get("wrapped");
+ return new WrapProcessor(processor, wrapped);
} else if ("InterceptSendToEndpointProcessor".equals(definitionName)) {
InterceptSendToEndpoint endpoint = (InterceptSendToEndpoint) args.get("endpoint");
Endpoint delegate = (Endpoint) args.get("delegate");
@@ -129,6 +138,14 @@ public class DefaultProcessorFactory implements ProcessorFactory {
} else if ("SharedCamelInternalProcessor".equals(definitionName)) {
return new SharedCamelInternalProcessor(
camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext));
+ } else if ("CamelInternalProcessor".equals(definitionName)) {
+ Processor processor = (Processor) args.get("processor");
+ Route route = (Route) args.get("route");
+ CamelInternalProcessor answer = new CamelInternalProcessor(camelContext, processor);
+ answer.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+ return answer;
+ } else if ("DefaultChannel".equals(definitionName)) {
+ return new DefaultChannel(camelContext);
}
return null;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 33abd8e..50f04ad 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -796,6 +796,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
protected Processor createUnitOfWorkProcessor(Route route, Processor processor, Exchange exchange) {
CamelInternalProcessor internal = new CamelInternalProcessor(exchange.getContext(), processor);
+ // TODO: use processor factory or spi
// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
if (parent != null) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index bf3a84c..f6c26f0 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -31,9 +31,9 @@ import org.apache.camel.Processor;
import org.apache.camel.Service;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.CamelInternalProcessorAdvice;
-import org.apache.camel.spi.InternalProcessor;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.SharedInternalProcessor;
import org.apache.camel.spi.ShutdownStrategy;
import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.UnitOfWork;
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
* <p/>
* The added advices can implement {@link Ordered} to control in which order the advices are executed.
*/
-public class SharedCamelInternalProcessor implements InternalProcessor {
+public class SharedCamelInternalProcessor implements SharedInternalProcessor {
private static final Logger LOG = LoggerFactory.getLogger(SharedCamelInternalProcessor.class);
private static final Object[] EMPTY_STATES = new Object[0];
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index d01c9c9..242fbdd 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -16,12 +16,16 @@
*/
package org.apache.camel.processor;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Producer;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.service.ServiceHelper;
@@ -42,10 +46,16 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer {
super(producer.getEndpoint());
this.producer = producer;
// wrap in unit of work
- CamelInternalProcessor internal = new CamelInternalProcessor(producer.getEndpoint().getCamelContext(), producer);
- internal.addAdvice(
- new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, producer.getEndpoint().getCamelContext()));
- this.processor = internal;
+
+ Map<String, Object> args = new HashMap<>();
+ args.put("processor", producer);
+ args.put("route", null);
+ ExtendedCamelContext ecc = producer.getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class);
+ try {
+ this.processor = (AsyncProcessor) ecc.getProcessorFactory().createProcessor(ecc, "UnitOfWorkProcessorAdvice", args);
+ } catch (Exception e) {
+ throw RuntimeCamelException.wrapRuntimeException(e);
+ }
}
@Override
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WrapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WrapProcessor.java
index 76a6983..2658162 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WrapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WrapProcessor.java
@@ -19,13 +19,14 @@ package org.apache.camel.processor;
import java.util.List;
import org.apache.camel.Processor;
+import org.apache.camel.spi.WrapAwareProcessor;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.apache.camel.support.service.ServiceHelper;
/**
* A processor which ensures wrapping processors is having lifecycle handled.
*/
-public class WrapProcessor extends DelegateAsyncProcessor {
+public class WrapProcessor extends DelegateAsyncProcessor implements WrapAwareProcessor {
private final Processor wrapped;
public WrapProcessor(Processor processor, Processor wrapped) {
@@ -35,7 +36,12 @@ public class WrapProcessor extends DelegateAsyncProcessor {
@Override
public String toString() {
- return "WrapDelegateAsyncProcessor[" + processor + "]";
+ return "WrapProcessor[" + processor + "]";
+ }
+
+ @Override
+ public Processor getWrapped() {
+ return wrapped;
}
@Override
@@ -57,5 +63,4 @@ public class WrapProcessor extends DelegateAsyncProcessor {
super.doStop();
ServiceHelper.stopService(wrapped);
}
-
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 1ac482d..15869e4 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -40,6 +40,7 @@ import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.CamelLogger;
+import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.ShutdownPrepared;
@@ -65,7 +66,7 @@ import org.slf4j.LoggerFactory;
* according to what they support.
*/
public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
- implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
+ implements ErrorHandlerRedeliveryCustomizer, AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
private static final Logger LOG = LoggerFactory.getLogger(RedeliveryErrorHandler.class);
@@ -192,12 +193,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
return callback.getFuture();
}
- /**
- * Allows to change the output of the error handler which are used when optimising the JMX instrumentation to use
- * either an advice or wrapped processor when calling a processor. The former is faster and therefore preferred,
- * however if the error handler supports redelivery we need fine grained instrumentation which then must be wrapped
- * and therefore need to change the output on the error handler.
- */
+ @Override
public void changeOutput(Processor output) {
this.output = output;
this.outputAsync = AsyncProcessorConverterHelper.convert(output);
@@ -307,9 +303,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
return answer;
}
- /**
- * Returns the output processor
- */
@Override
public Processor getOutput() {
return output;
@@ -1455,12 +1448,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
}
}
- /**
- * Determines if redelivery is enabled by checking if any of the redelivery policy settings may allow redeliveries.
- *
- * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
- * @throws Exception can be thrown
- */
+ @Override
public boolean determineIfRedeliveryIsEnabled() throws Exception {
// determine if redeliver is enabled either on error handler
if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
diff --git a/core/camel-core-reifier/pom.xml b/core/camel-core-reifier/pom.xml
index 8d99a4f..6483577 100644
--- a/core/camel-core-reifier/pom.xml
+++ b/core/camel-core-reifier/pom.xml
@@ -46,11 +46,6 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-api</artifactId>
</dependency>
- <!-- TODO: Remove this dependency -->
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-base</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-support</artifactId>
@@ -58,10 +53,22 @@
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-model</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-base</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-processor</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-base</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 3d7f7e7..e852aab 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -16,16 +16,19 @@
*/
package org.apache.camel.reifier;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.AggregationStrategy;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
import org.apache.camel.model.AggregateDefinition;
import org.apache.camel.model.OptimisticLockRetryPolicyDefinition;
import org.apache.camel.model.ProcessorDefinition;
@@ -50,10 +53,12 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
protected AggregateProcessor createAggregator() throws Exception {
Processor childProcessor = this.createChildProcessor(true);
- // TODO: Make this via SPI or some facade
// wrap the aggregate route in a unit of work processor
- CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, childProcessor);
- internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+ Map<String, Object> args = new HashMap<>();
+ args.put("processor", childProcessor);
+ args.put("route", route);
+ AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+ .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
Expression correlation = createExpression(definition.getExpression());
AggregationStrategy strategy = createAggregationStrategy();
@@ -70,7 +75,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
}
AggregateProcessor answer
- = new AggregateProcessor(camelContext, internal, correlation, strategy, threadPool, shutdownThreadPool);
+ = new AggregateProcessor(camelContext, target, correlation, strategy, threadPool, shutdownThreadPool);
AggregationRepository repository = createAggregationRepository();
if (repository != null) {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index 7866a00..32f5ce9 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -16,12 +16,15 @@
*/
package org.apache.camel.reifier;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
import org.apache.camel.model.OnCompletionDefinition;
import org.apache.camel.model.OnCompletionMode;
import org.apache.camel.model.ProcessorDefinition;
@@ -51,12 +54,14 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
Processor childProcessor = this.createChildProcessor(true);
- // TODO: Make this via SPI or some facade
// wrap the on completion route in a unit of work processor
- CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, childProcessor);
- internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+ Map<String, Object> args = new HashMap<>();
+ args.put("processor", childProcessor);
+ args.put("route", route);
+ AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+ .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
- route.setOnCompletion(getId(definition), internal);
+ route.setOnCompletion(getId(definition), target);
Predicate when = null;
if (definition.getOnWhen() != null) {
@@ -71,7 +76,7 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
|| parse(OnCompletionMode.class, definition.getMode()) == OnCompletionMode.AfterConsumer;
OnCompletionProcessor answer = new OnCompletionProcessor(
- camelContext, internal, threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when,
+ camelContext, target, threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when,
original, afterConsumer);
return answer;
}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index bf6297f..bc7288b 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -104,7 +104,6 @@ import org.apache.camel.model.WireTapDefinition;
import org.apache.camel.model.cloud.ServiceCallDefinition;
import org.apache.camel.processor.InterceptEndpointProcessor;
import org.apache.camel.processor.Pipeline;
-import org.apache.camel.processor.channel.DefaultChannel;
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.InterceptStrategy;
@@ -529,9 +528,9 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
protected Channel wrapChannel(Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler)
throws Exception {
- // put a channel in between this and each output to control the route
- // flow logic
- DefaultChannel channel = new DefaultChannel(camelContext);
+ // put a channel in between this and each output to control the route flow logic
+ Channel channel = (Channel) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+ .createProcessor(camelContext, "DefaultChannel", null);
// add interceptor strategies to the channel must be in this order:
// camel context, route context, local
@@ -635,7 +634,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
* @param inheritErrorHandler whether to inherit error handler
* @throws Exception can be thrown if failed to create error handler builder
*/
- private void wrapChannelInErrorHandler(DefaultChannel channel, Boolean inheritErrorHandler) throws Exception {
+ private void wrapChannelInErrorHandler(Channel channel, Boolean inheritErrorHandler) throws Exception {
if (inheritErrorHandler == null || inheritErrorHandler) {
log.trace("{} is configured to inheritErrorHandler", definition);
Processor output = channel.getOutput();
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
index d7b15de..e0caeb4 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
@@ -16,10 +16,14 @@
*/
package org.apache.camel.reifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.Route;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.ResequenceDefinition;
import org.apache.camel.model.config.BatchResequencerConfig;
@@ -72,10 +76,12 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
Processor processor = this.createChildProcessor(true);
Expression expression = createExpression(definition.getExpression());
- // TODO: Make this via SPI or some facade
// and wrap in unit of work
- CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, processor);
- internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+ Map<String, Object> args = new HashMap<>();
+ args.put("processor", processor);
+ args.put("route", route);
+ AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+ .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
ObjectHelper.notNull(config, "config", this);
ObjectHelper.notNull(expression, "expression", this);
@@ -83,7 +89,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
boolean isReverse = parseBoolean(config.getReverse(), false);
boolean isAllowDuplicates = parseBoolean(config.getAllowDuplicates(), false);
- Resequencer resequencer = new Resequencer(camelContext, internal, expression, isAllowDuplicates, isReverse);
+ Resequencer resequencer = new Resequencer(camelContext, target, expression, isAllowDuplicates, isReverse);
resequencer.setBatchSize(parseInt(config.getBatchSize()));
resequencer.setBatchTimeout(parseDuration(config.getBatchTimeout()));
resequencer.setReverse(isReverse);
@@ -105,8 +111,11 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
Processor processor = this.createChildProcessor(true);
Expression expression = createExpression(definition.getExpression());
- CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, processor);
- internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+ Map<String, Object> args = new HashMap<>();
+ args.put("processor", processor);
+ args.put("route", route);
+ AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+ .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
ObjectHelper.notNull(config, "config", this);
ObjectHelper.notNull(expression, "expression", this);
@@ -122,7 +131,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
}
comparator.setExpression(expression);
- StreamResequencer resequencer = new StreamResequencer(camelContext, internal, comparator, expression);
+ StreamResequencer resequencer = new StreamResequencer(camelContext, target, comparator, expression);
resequencer.setTimeout(parseDuration(config.getTimeout()));
if (config.getDeliveryAttemptInterval() != null) {
resequencer.setDeliveryAttemptInterval(parseDuration(config.getDeliveryAttemptInterval()));
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
index 6a9c25c..ddb3bee 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -32,7 +32,6 @@ import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ShutdownRoute;
import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.PropertyDefinition;
import org.apache.camel.model.RouteDefinition;
@@ -40,6 +39,7 @@ import org.apache.camel.processor.ContractAdvice;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.reifier.rest.RestBindingReifier;
import org.apache.camel.spi.Contract;
+import org.apache.camel.spi.InternalProcessor;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.ManagementInterceptStrategy;
import org.apache.camel.spi.RoutePolicy;
@@ -236,10 +236,12 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
// handles preparing the response from the exchange in regard to IN vs OUT messages etc
Processor target = new Pipeline(camelContext, eventDrivenProcessors);
- // TODO: Make this via SPI or some facade
// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
- CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, target);
- internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+ Map<String, Object> args = new HashMap<>();
+ args.put("processor", target);
+ args.put("route", route);
+ InternalProcessor internal = (InternalProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+ .createProcessor(camelContext, "CamelInternalProcessor", args);
// and then optionally add route policy processor if a custom policy is set
List<RoutePolicy> routePolicyList = route.getRoutePolicyList();
@@ -256,21 +258,20 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
}
}
- internal.addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList));
+ internal.addRoutePolicyAdvice(routePolicyList);
}
// wrap in route inflight processor to track number of inflight exchanges for the route
- internal.addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(
- camelContext.getInflightRepository(), route.getRouteId()));
+ internal.addRouteInflightRepositoryAdvice(camelContext.getInflightRepository(), route.getRouteId());
// wrap in JMX instrumentation processor that is used for performance stats
ManagementInterceptStrategy managementInterceptStrategy = route.getManagementInterceptStrategy();
if (managementInterceptStrategy != null) {
- internal.addAdvice(CamelInternalProcessor.wrap(managementInterceptStrategy.createProcessor("route")));
+ internal.addManagementInterceptStrategy(managementInterceptStrategy.createProcessor("route"));
}
// wrap in route lifecycle
- internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
+ internal.addRouteLifecycleAdvice();
// add advices
if (definition.getRestBindingDefinition() != null) {
@@ -308,16 +309,8 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
route.setAutoStartup(isAutoStartup);
}
- // after the route is created then set the route on the policy processor so we get hold of it
- CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class);
- if (task != null) {
- task.setRoute(route);
- }
- CamelInternalProcessor.RouteLifecycleAdvice task2
- = internal.getAdvice(CamelInternalProcessor.RouteLifecycleAdvice.class);
- if (task2 != null) {
- task2.setRoute(route);
- }
+ // after the route is created then set the route on the policy processor(s) so we get hold of it
+ internal.setRouteOnAdvices(route);
// invoke init on route policy
if (routePolicyList != null && !routePolicyList.isEmpty()) {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index c14b092..f0cb9fd 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -16,13 +16,16 @@
*/
package org.apache.camel.reifier;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.Route;
-import org.apache.camel.impl.engine.CamelInternalProcessor;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.SetHeaderDefinition;
import org.apache.camel.model.WireTapDefinition;
@@ -48,18 +51,20 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
SendDynamicProcessor dynamicTo = (SendDynamicProcessor) super.createProcessor();
// create error handler we need to use for processing the wire tapped
- Processor target = wrapInErrorHandler(dynamicTo, true);
+ Processor childProcessor = wrapInErrorHandler(dynamicTo, true);
- // TODO: Make this via SPI or some facade
// and wrap in unit of work
- CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, target);
- internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
+ Map<String, Object> args = new HashMap<>();
+ args.put("processor", childProcessor);
+ args.put("route", route);
+ AsyncProcessor target = (AsyncProcessor) camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory()
+ .createProcessor(camelContext, "UnitOfWorkProcessorAdvice", args);
// is true by default
boolean isCopy = parseBoolean(definition.getCopy(), true);
WireTapProcessor answer = new WireTapProcessor(
- dynamicTo, internal,
+ dynamicTo, target,
parse(ExchangePattern.class, definition.getPattern()),
threadPool, shutdownThreadPool,
parseBoolean(definition.getDynamicUri(), true));
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
index ba17095..0bc0f6b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
@@ -21,12 +21,12 @@ import java.util.List;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
+import org.apache.camel.impl.engine.DefaultChannel;
import org.apache.camel.model.LoadBalanceDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.SendDefinition;
import org.apache.camel.model.loadbalancer.RandomLoadBalancerDefinition;
-import org.apache.camel.processor.channel.DefaultChannel;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
index 5499007..1951fd4 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
@@ -23,8 +23,8 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.engine.DefaultChannel;
import org.apache.camel.impl.engine.DefaultRoute;
-import org.apache.camel.processor.channel.DefaultChannel;
import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;