You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2020/03/04 08:57:48 UTC
[camel] 03/32: Move ErrorHandler all reification to the reifiers
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4275688e8e36e7c17e7bd5bd6dac56b32744184a
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Thu Feb 20 13:37:20 2020 +0100
Move ErrorHandler all reification to the reifiers
# Conflicts:
# core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java
# core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
# core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java
# core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java
---
.../JtaTransactionErrorHandlerBuilder.java | 99 +++-------------
...java => JtaTransactionErrorHandlerReifier.java} | 129 +++++++++------------
.../cdi/transaction/JtaTransactionPolicy.java | 10 +-
.../processor/HystrixHierarchicalConfigTest.java | 2 +-
.../SpringHystrixRouteHierarchicalConfigTest.java | 2 +-
.../spring/spi/TransactionErrorHandlerBuilder.java | 61 ----------
.../apache/camel/spring/EndpointReferenceTest.java | 2 -
.../spring/config/DummyErrorHandlerBuilder.java | 42 ++++---
.../java/org/apache/camel/ErrorHandlerFactory.java | 23 ----
.../java/org/apache/camel/spi/RouteContext.java | 2 +
...RouteContext.java => AbstractRouteContext.java} | 16 ++-
.../apache/camel/processor/MulticastProcessor.java | 21 +---
.../camel/processor/RecipientListProcessor.java | 20 +---
.../camel/builder/AdviceWithRouteBuilder.java | 25 ++--
.../camel/builder/DeadLetterChannelBuilder.java | 33 ------
.../camel/builder/DefaultErrorHandlerBuilder.java | 38 ------
.../camel/builder/ErrorHandlerBuilderRef.java | 29 -----
.../camel/builder/ErrorHandlerBuilderSupport.java | 103 +---------------
.../camel/builder/NoErrorHandlerBuilder.java | 30 -----
.../java/org/apache/camel/impl/DefaultModel.java | 1 -
.../DefaultRouteContext.java} | 20 ++--
.../apache/camel/reifier/DynamicRouterReifier.java | 6 +-
.../org/apache/camel/reifier/ProcessorReifier.java | 14 ++-
.../apache/camel/reifier/RecipientListReifier.java | 2 +-
.../org/apache/camel/reifier/RouteReifier.java | 1 -
.../apache/camel/reifier/RoutingSlipReifier.java | 9 +-
.../org/apache/camel/reifier/WireTapReifier.java | 2 +-
.../errorhandler/ErrorHandlerRefReifier.java | 12 +-
.../reifier/errorhandler/ErrorHandlerReifier.java | 76 ++++++++----
.../errorhandler/ErrorHandlerSupportTest.java | 19 +--
.../DefaultExceptionPolicyStrategyTest.java | 10 +-
.../camel/reifier/DataFormatReifierTest.java | 8 +-
.../apache/camel/reifier/ProcessorReifierTest.java | 2 +-
.../apache/camel/support/CamelContextHelper.java | 1 -
34 files changed, 262 insertions(+), 608 deletions(-)
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java
index bb15f57..f56598a 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java
@@ -16,22 +16,11 @@
*/
package org.apache.camel.cdi.transaction;
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
-import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.DefaultErrorHandlerBuilder;
import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.model.TransactedDefinition;
-import org.apache.camel.reifier.TransactedReifier;
+import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
import org.apache.camel.spi.CamelLogger;
-import org.apache.camel.spi.Policy;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.TransactedPolicy;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
@@ -40,12 +29,9 @@ import org.slf4j.LoggerFactory;
*/
public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder {
- public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY =
- JtaTransactionErrorHandlerBuilder.class.getName() + "#rollbackLoggingLevel";
-
- private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class);
-
- private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED";
+ static {
+ ErrorHandlerReifier.registerReifier(JtaTransactionErrorHandlerBuilder.class, JtaTransactionErrorHandlerReifier::new);
+ }
private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN;
@@ -75,73 +61,8 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde
}
}
- @Override
- public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception {
- CamelContext camelContext = routeContext.getCamelContext();
- // resolve policy reference, if given
- if (transactionPolicy == null) {
- if (policyRef != null) {
- final TransactedDefinition transactedDefinition = new TransactedDefinition();
- transactedDefinition.setRef(policyRef);
- final Policy policy = new TransactedReifier(camelContext, transactedDefinition).resolvePolicy();
- if (policy != null) {
- if (!(policy instanceof JtaTransactionPolicy)) {
- throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '"
- + policyRef.getClass().getName() + "' but an instance of '"
- + JtaTransactionPolicy.class.getName() + "' is required!");
- }
- transactionPolicy = (JtaTransactionPolicy) policy;
- }
- }
- }
-
- // try to lookup default policy
- if (transactionPolicy == null) {
- LOG.debug(
- "No transaction policy configured on TransactionErrorHandlerBuilder. Will try find it in the registry.");
-
- Map<String, TransactedPolicy> mapPolicy = camelContext.getRegistry().findByTypeWithName(TransactedPolicy.class);
- if (mapPolicy != null && mapPolicy.size() == 1) {
- TransactedPolicy policy = mapPolicy.values().iterator().next();
- if (policy instanceof JtaTransactionPolicy) {
- transactionPolicy = (JtaTransactionPolicy) policy;
- }
- }
-
- if (transactionPolicy == null) {
- TransactedPolicy policy = camelContext.getRegistry().lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class);
- if (policy instanceof JtaTransactionPolicy) {
- transactionPolicy = (JtaTransactionPolicy) policy;
- }
- }
-
- if (transactionPolicy != null) {
- LOG.debug("Found TransactionPolicy in registry to use: {}", transactionPolicy);
- }
- }
-
- ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this);
-
- final Map<String, String> properties = camelContext.getGlobalOptions();
- if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) {
- rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY));
- }
-
- JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext,
- processor,
- getLogger(),
- getOnRedelivery(),
- getRedeliveryPolicy(),
- getExceptionPolicyStrategy(),
- transactionPolicy,
- getRetryWhilePolicy(camelContext),
- getExecutorService(camelContext),
- rollbackLoggingLevel,
- getOnExceptionOccurred());
-
- // configure error handler before we can use it
- configure(routeContext, answer);
- return answer;
+ public String getPolicyRef() {
+ return policyRef;
}
public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) {
@@ -149,11 +70,19 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde
return this;
}
+ public JtaTransactionPolicy getTransactionPolicy() {
+ return transactionPolicy;
+ }
+
public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) {
this.transactionPolicy = transactionPolicy;
return this;
}
+ public LoggingLevel getRollbackLoggingLevel() {
+ return rollbackLoggingLevel;
+ }
+
public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) {
this.rollbackLoggingLevel = rollbackLoggingLevel;
return this;
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerReifier.java
similarity index 51%
copy from components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java
copy to components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerReifier.java
index bb15f57..252e3ee 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerReifier.java
@@ -17,77 +17,51 @@
package org.apache.camel.cdi.transaction;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
-import org.apache.camel.CamelContext;
+import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.builder.DefaultErrorHandlerBuilder;
-import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.model.TransactedDefinition;
import org.apache.camel.reifier.TransactedReifier;
-import org.apache.camel.spi.CamelLogger;
+import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.spi.TransactedPolicy;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Builds transactional error handlers. This class is based on
- * {@link org.apache.camel.spring.spi.TransactionErrorHandlerBuilder}.
- */
-public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder {
+public class JtaTransactionErrorHandlerReifier extends ErrorHandlerReifier<JtaTransactionErrorHandlerBuilder> {
public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY =
- JtaTransactionErrorHandlerBuilder.class.getName() + "#rollbackLoggingLevel";
-
- private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class);
+ JtaTransactionErrorHandlerBuilder.class.getName() + "#rollbackLoggingLevel";
private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED";
- private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN;
-
- private JtaTransactionPolicy transactionPolicy;
-
- private String policyRef;
+ private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerReifier.class);
- @Override
- public boolean supportTransacted() {
- return true;
+ public JtaTransactionErrorHandlerReifier(RouteContext routeContext, ErrorHandlerFactory definition) {
+ super(routeContext, (JtaTransactionErrorHandlerBuilder) definition);
}
@Override
- public ErrorHandlerBuilder cloneBuilder() {
- final JtaTransactionErrorHandlerBuilder answer = new JtaTransactionErrorHandlerBuilder();
- cloneBuilder(answer);
- return answer;
- }
+ public Processor createErrorHandler(final Processor processor) throws Exception {
+ JtaTransactionPolicy transactionPolicy = definition.getTransactionPolicy();
- @Override
- protected void cloneBuilder(DefaultErrorHandlerBuilder other) {
- super.cloneBuilder(other);
- if (other instanceof JtaTransactionErrorHandlerBuilder) {
- final JtaTransactionErrorHandlerBuilder otherTx = (JtaTransactionErrorHandlerBuilder) other;
- transactionPolicy = otherTx.transactionPolicy;
- rollbackLoggingLevel = otherTx.rollbackLoggingLevel;
- }
- }
-
- @Override
- public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception {
- CamelContext camelContext = routeContext.getCamelContext();
// resolve policy reference, if given
if (transactionPolicy == null) {
- if (policyRef != null) {
+ if (definition.getPolicyRef() != null) {
final TransactedDefinition transactedDefinition = new TransactedDefinition();
- transactedDefinition.setRef(policyRef);
+ transactedDefinition.setRef(definition.getPolicyRef());
final Policy policy = new TransactedReifier(camelContext, transactedDefinition).resolvePolicy();
if (policy != null) {
if (!(policy instanceof JtaTransactionPolicy)) {
- throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '"
- + policyRef.getClass().getName() + "' but an instance of '"
+ throw new RuntimeCamelException("The configured policy '" + definition.getPolicyRef()
+ + "' is of type '" + policy.getClass().getName() + "' but an instance of '"
+ JtaTransactionPolicy.class.getName() + "' is required!");
}
transactionPolicy = (JtaTransactionPolicy) policy;
@@ -97,10 +71,10 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde
// try to lookup default policy
if (transactionPolicy == null) {
- LOG.debug(
- "No transaction policy configured on TransactionErrorHandlerBuilder. Will try find it in the registry.");
+ LOG.debug("No transaction policy configured on TransactionErrorHandlerBuilder. "
+ + "Will try find it in the registry.");
- Map<String, TransactedPolicy> mapPolicy = camelContext.getRegistry().findByTypeWithName(TransactedPolicy.class);
+ Map<String, TransactedPolicy> mapPolicy = findByTypeWithName(TransactedPolicy.class);
if (mapPolicy != null && mapPolicy.size() == 1) {
TransactedPolicy policy = mapPolicy.values().iterator().next();
if (policy instanceof JtaTransactionPolicy) {
@@ -109,7 +83,7 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde
}
if (transactionPolicy == null) {
- TransactedPolicy policy = camelContext.getRegistry().lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class);
+ TransactedPolicy policy = lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class);
if (policy instanceof JtaTransactionPolicy) {
transactionPolicy = (JtaTransactionPolicy) policy;
}
@@ -123,49 +97,52 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde
ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this);
final Map<String, String> properties = camelContext.getGlobalOptions();
+ LoggingLevel rollbackLoggingLevel = definition.getRollbackLoggingLevel();
if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) {
rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY));
}
JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext,
processor,
- getLogger(),
- getOnRedelivery(),
- getRedeliveryPolicy(),
- getExceptionPolicyStrategy(),
+ definition.getLogger(),
+ definition.getOnRedelivery(),
+ definition.getRedeliveryPolicy(),
+ definition.getExceptionPolicyStrategy(),
transactionPolicy,
- getRetryWhilePolicy(camelContext),
- getExecutorService(camelContext),
+ definition.getRetryWhilePolicy(camelContext),
+ getExecutorService(),
rollbackLoggingLevel,
- getOnExceptionOccurred());
+ definition.getOnExceptionOccurred());
// configure error handler before we can use it
- configure(routeContext, answer);
+ configure(answer);
return answer;
}
- public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) {
- policyRef = ref;
- return this;
- }
-
- public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) {
- this.transactionPolicy = transactionPolicy;
- return this;
- }
-
- public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) {
- this.rollbackLoggingLevel = rollbackLoggingLevel;
- return this;
- }
-
- @Override
- protected CamelLogger createLogger() {
- return new CamelLogger(LoggerFactory.getLogger(TransactionErrorHandler.class), LoggingLevel.ERROR);
+ protected synchronized ScheduledExecutorService getExecutorService() {
+ ScheduledExecutorService executorService = definition.getExecutorService();
+ if (executorService == null || executorService.isShutdown()) {
+ // camel context will shutdown the executor when it shutdown so no
+ // need to shut it down when stopping
+ if (definition.getExecutorServiceRef() != null) {
+ executorService = lookupByNameAndType(definition.getExecutorServiceRef(), ScheduledExecutorService.class);
+ if (executorService == null) {
+ ExecutorServiceManager manager = camelContext.getExecutorServiceManager();
+ ThreadPoolProfile profile = manager.getThreadPoolProfile(definition.getExecutorServiceRef());
+ executorService = manager.newScheduledThreadPool(this, definition.getExecutorServiceRef(), profile);
+ }
+ if (executorService == null) {
+ throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry.");
+ }
+ } else {
+ // no explicit configured thread pool, so leave it up to the
+ // error handler to decide if it need
+ // a default thread pool from
+ // CamelContext#getErrorHandlerExecutorService
+ executorService = null;
+ }
+ }
+ return executorService;
}
- @Override
- public String toString() {
- return "JtaTransactionErrorHandlerBuilder";
- }
}
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java
index e19d650..124a483 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java
@@ -65,13 +65,11 @@ public abstract class JtaTransactionPolicy implements TransactedPolicy {
public Processor wrap(RouteContext routeContext, Processor processor) {
JtaTransactionErrorHandler answer;
// the goal is to configure the error handler builder on the route as a
- // transacted error handler,
- // either its already a transacted or if not we replace it with a
- // transacted one that we configure here
+ // transacted error handler. If the configured builder is not transacted,
+ // we replace it with a transacted one that we configure here
// and wrap the processor in the transacted error handler as we can have
- // transacted routes that change
- // propagation behavior, eg: from A required -> B -> requiresNew C
- // (advanced use-case)
+ // transacted routes that change propagation behavior,
+ // eg: from A required -> B -> requiresNew C (advanced use-case)
// if we should not support this we do not need to wrap the processor as
// we only need one transacted error handler
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixHierarchicalConfigTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixHierarchicalConfigTest.java
index 11f8415..765360a 100644
--- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixHierarchicalConfigTest.java
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixHierarchicalConfigTest.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.hystrix.processor;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.engine.DefaultRouteContext;
+import org.apache.camel.impl.DefaultRouteContext;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.Model;
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/SpringHystrixRouteHierarchicalConfigTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/SpringHystrixRouteHierarchicalConfigTest.java
index b191a31..105b3e2 100644
--- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/SpringHystrixRouteHierarchicalConfigTest.java
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/SpringHystrixRouteHierarchicalConfigTest.java
@@ -17,7 +17,7 @@
package org.apache.camel.component.hystrix.processor;
import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.impl.engine.DefaultRouteContext;
+import org.apache.camel.impl.DefaultRouteContext;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.RouteDefinition;
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
index 1c7fc85..3aa044a 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
@@ -59,67 +59,6 @@ public class TransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder {
return true;
}
- @Override
- public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
- CamelContext camelContext = routeContext.getCamelContext();
- if (transactionTemplate == null) {
- // lookup in context if no transaction template has been configured
- LOG.debug("No TransactionTemplate configured on TransactionErrorHandlerBuilder. Will try find it in the registry.");
-
- Map<String, TransactedPolicy> mapPolicy = camelContext.getRegistry().findByTypeWithName(TransactedPolicy.class);
- if (mapPolicy != null && mapPolicy.size() == 1) {
- TransactedPolicy policy = mapPolicy.values().iterator().next();
- if (policy instanceof SpringTransactionPolicy) {
- transactionTemplate = ((SpringTransactionPolicy) policy).getTransactionTemplate();
- }
- }
-
- if (transactionTemplate == null) {
- TransactedPolicy policy = camelContext.getRegistry().lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class);
- if (policy instanceof SpringTransactionPolicy) {
- transactionTemplate = ((SpringTransactionPolicy) policy).getTransactionTemplate();
- }
- }
-
- if (transactionTemplate == null) {
- Map<String, TransactionTemplate> mapTemplate = camelContext.getRegistry().findByTypeWithName(TransactionTemplate.class);
- if (mapTemplate == null || mapTemplate.isEmpty()) {
- LOG.trace("No TransactionTemplate found in registry.");
- } else if (mapTemplate.size() == 1) {
- transactionTemplate = mapTemplate.values().iterator().next();
- } else {
- LOG.debug("Found {} TransactionTemplate in registry. Cannot determine which one to use. "
- + "Please configure a TransactionTemplate on the TransactionErrorHandlerBuilder", mapTemplate.size());
- }
- }
-
- if (transactionTemplate == null) {
- Map<String, PlatformTransactionManager> mapManager = camelContext.getRegistry().findByTypeWithName(PlatformTransactionManager.class);
- if (mapManager == null || mapManager.isEmpty()) {
- LOG.trace("No PlatformTransactionManager found in registry.");
- } else if (mapManager.size() == 1) {
- transactionTemplate = new TransactionTemplate(mapManager.values().iterator().next());
- } else {
- LOG.debug("Found {} PlatformTransactionManager in registry. Cannot determine which one to use for TransactionTemplate. "
- + "Please configure a TransactionTemplate on the TransactionErrorHandlerBuilder", mapManager.size());
- }
- }
-
- if (transactionTemplate != null) {
- LOG.debug("Found TransactionTemplate in registry to use: {}", transactionTemplate);
- }
- }
-
- ObjectHelper.notNull(transactionTemplate, "transactionTemplate", this);
-
- TransactionErrorHandler answer = new TransactionErrorHandler(camelContext, processor,
- getLogger(), getOnRedelivery(), getRedeliveryPolicy(), getExceptionPolicyStrategy(), transactionTemplate,
- getRetryWhilePolicy(camelContext), getExecutorService(camelContext), getRollbackLoggingLevel(), getOnExceptionOccurred());
- // configure error handler before we can use it
- configure(routeContext, answer);
- return answer;
- }
-
public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
this.transactionTemplate = transactionTemplate;
}
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java
index 23c80cc..42db380 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java
@@ -21,9 +21,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.engine.DefaultRouteContext;
import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.spi.RouteContext;
import org.apache.camel.spring.example.DummyBean;
import org.apache.camel.support.CamelContextHelper;
import org.junit.Test;
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/config/DummyErrorHandlerBuilder.java b/components/camel-spring/src/test/java/org/apache/camel/spring/config/DummyErrorHandlerBuilder.java
index 3ad380e..c7d4764 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/config/DummyErrorHandlerBuilder.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/config/DummyErrorHandlerBuilder.java
@@ -16,23 +16,27 @@
*/
package org.apache.camel.spring.config;
+import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.ErrorHandlerBuilderSupport;
+import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.processor.DelegateProcessor;
import org.springframework.beans.factory.BeanNameAware;
public class DummyErrorHandlerBuilder extends ErrorHandlerBuilderSupport implements BeanNameAware {
+
public static final String PROPERTY_NAME = "DummyErrorHandler";
- private String beanName;
- public DummyErrorHandlerBuilder() {
+ static {
+ ErrorHandlerReifier.registerReifier(DummyErrorHandlerBuilder.class, DummyErrorHandlerReifier::new);
}
- public DummyErrorHandlerBuilder(String beanName) {
- this.beanName = beanName;
+ private String beanName;
+
+ public DummyErrorHandlerBuilder() {
}
@Override
@@ -40,9 +44,8 @@ public class DummyErrorHandlerBuilder extends ErrorHandlerBuilderSupport impleme
this.beanName = beanName;
}
- @Override
- public boolean supportTransacted() {
- return false;
+ public String getBeanName() {
+ return beanName;
}
@Override
@@ -53,15 +56,22 @@ public class DummyErrorHandlerBuilder extends ErrorHandlerBuilderSupport impleme
return answer;
}
- @Override
- public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
- return new DelegateProcessor(processor) {
- @Override
- public void process(Exchange exchange) throws Exception {
- exchange.setProperty(PROPERTY_NAME, beanName);
- super.process(exchange);
- }
- };
+ public static class DummyErrorHandlerReifier extends ErrorHandlerReifier<DummyErrorHandlerBuilder> {
+
+ public DummyErrorHandlerReifier(RouteContext routeContext, ErrorHandlerFactory definition) {
+ super(routeContext, (DummyErrorHandlerBuilder) definition);
+ }
+
+ @Override
+ public Processor createErrorHandler(Processor processor) throws Exception {
+ return new DelegateProcessor(processor) {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.setProperty(PROPERTY_NAME, definition.getBeanName());
+ super.process(exchange);
+ }
+ };
+ }
}
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java b/core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java
index 62f5918..9e35c8a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java
@@ -16,32 +16,9 @@
*/
package org.apache.camel;
-import org.apache.camel.spi.RouteContext;
-
/**
* Factory for creating {@link org.apache.camel.processor.ErrorHandler}s.
*/
public interface ErrorHandlerFactory {
- /**
- * Creates the error handler
- *
- * @param routeContext the route context
- * @param processor the outer processor
- * @return the error handler
- * @throws Exception is thrown if the error handler could not be created
- */
- Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception;
-
- /**
- * Gets or lookup the target error handler factory.
- *
- * Will either get this current as the error handler factory, or in case this is a reference lookup
- * for a actual error handler, then this method will perform a lookup to return the actual error handler factory.
- *
- * @param routeContext the route context
- * @return the error handler factory.
- */
- ErrorHandlerFactory getOrLookupErrorHandlerFactory(RouteContext routeContext);
-
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java
index 8513863..39e9b2c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java
@@ -164,6 +164,8 @@ public interface RouteContext extends RuntimeConfiguration, EndpointAware {
ErrorHandlerFactory getErrorHandlerFactory();
+ Processor createErrorHandler(Processor processor) throws Exception;
+
void addAdvice(CamelInternalProcessorAdvice<?> advice);
void addProperty(String key, Object value);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractRouteContext.java
similarity index 97%
rename from core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
rename to core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractRouteContext.java
index 0ee844b..5bf48b0 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractRouteContext.java
@@ -48,7 +48,8 @@ import org.apache.camel.util.ObjectHelper;
/**
* The context used to activate new routing rules
*/
-public class DefaultRouteContext implements RouteContext {
+public abstract class AbstractRouteContext implements RouteContext {
+
private NamedNode route;
private String routeId;
private Route runtimeRoute;
@@ -81,7 +82,7 @@ public class DefaultRouteContext implements RouteContext {
// must be concurrent as error handlers can be mutated concurrently via multicast/recipientlist EIPs
private ConcurrentMap<ErrorHandlerFactory, Set<NamedNode>> errorHandlers = new ConcurrentHashMap<>();
- public DefaultRouteContext(CamelContext camelContext, NamedNode route, String routeId) {
+ public AbstractRouteContext(CamelContext camelContext, NamedNode route, String routeId) {
this.camelContext = camelContext;
this.route = route;
this.routeId = routeId;
@@ -508,12 +509,12 @@ public class DefaultRouteContext implements RouteContext {
@Override
public void addErrorHandler(ErrorHandlerFactory factory, NamedNode onException) {
- getErrorHandlers(factory).add(onException);
+ doGetErrorHandlers(factory).add(onException);
}
@Override
public Set<NamedNode> getErrorHandlers(ErrorHandlerFactory factory) {
- return errorHandlers.computeIfAbsent(factory, f -> new LinkedHashSet<>());
+ return doGetErrorHandlers(factory);
}
public void removeErrorHandlers(ErrorHandlerFactory factory) {
@@ -522,10 +523,15 @@ public class DefaultRouteContext implements RouteContext {
@Override
public void addErrorHandlerFactoryReference(ErrorHandlerFactory source, ErrorHandlerFactory target) {
- Set<NamedNode> list = getErrorHandlers(source);
+ Set<NamedNode> list = doGetErrorHandlers(source);
Set<NamedNode> previous = errorHandlers.put(target, list);
if (list != previous && ObjectHelper.isNotEmpty(previous) && ObjectHelper.isNotEmpty(list)) {
throw new IllegalStateException("Multiple references with different handlers");
}
}
+
+ public Set<NamedNode> doGetErrorHandlers(ErrorHandlerFactory factory) {
+ return errorHandlers.computeIfAbsent(factory, f -> new LinkedHashSet<>());
+ }
+
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 7cadfdc..65f1587 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -41,7 +41,6 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
-import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
@@ -138,9 +137,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
* <p/>
* See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods.
*/
- static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> {
+ static final class ErrorHandlerKey extends KeyValueHolder<RouteContext, Processor> {
- PreparedErrorHandler(RouteContext key, Processor value) {
+ ErrorHandlerKey(RouteContext key, Processor value) {
super(key, value);
}
@@ -163,7 +162,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
private ExecutorService aggregateExecutorService;
private boolean shutdownAggregateExecutorService;
private final long timeout;
- private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ErrorHandlerKey, Processor> errorHandlers = new ConcurrentHashMap<>();
private final boolean shareUnitOfWork;
public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) {
@@ -722,7 +721,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
// for the entire multicast block again which will start from scratch again
// create key for cache
- final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);
+ final ErrorHandlerKey key = new ErrorHandlerKey(routeContext, processor);
// lookup cached first to reuse and preserve memory
answer = errorHandlers.get(key);
@@ -732,11 +731,10 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
}
LOG.trace("Creating error handler for: {}", processor);
- ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory();
// create error handler (create error handler directly to keep it light weight,
- // instead of using ProcessorDefinition.wrapInErrorHandler)
+ // instead of using ProcessorReifier.wrapInErrorHandler)
try {
- processor = createErrorHandler(routeContext, builder, exchange, processor);
+ processor = routeContext.createErrorHandler(processor);
// and wrap in unit of work processor so the copy exchange also can run under UoW
answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
@@ -764,13 +762,6 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
}
/**
- * Strategy to create the error handler from the builder
- */
- protected Processor createErrorHandler(RouteContext routeContext, ErrorHandlerFactory builder, Exchange exchange, Processor processor) throws Exception {
- return builder.createErrorHandler(routeContext, processor);
- }
-
- /**
* Strategy to create the unit of work to be used for the sub route
*
* @param routeContext the route context
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 66a22d3..a031939 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -16,9 +16,6 @@
*/
package org.apache.camel.processor;
-import java.io.UnsupportedEncodingException;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -28,7 +25,6 @@ import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncProducer;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
-import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedCamelContext;
@@ -46,7 +42,6 @@ import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -247,8 +242,6 @@ public class RecipientListProcessor extends MulticastProcessor {
*/
protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer,
Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
- Processor prepared = producer;
-
// copy exchange, and do not share the unit of work
Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
@@ -258,11 +251,11 @@ public class RecipientListProcessor extends MulticastProcessor {
}
// set property which endpoint we send to
- setToEndpoint(copy, prepared);
+ setToEndpoint(copy, producer);
// rework error handling to support fine grained error handling
RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
- prepared = createErrorHandler(routeContext, copy, prepared);
+ Processor prepared = createErrorHandler(routeContext, copy, producer);
// invoke on prepare on the exchange if specified
if (onPrepare != null) {
@@ -278,17 +271,14 @@ public class RecipientListProcessor extends MulticastProcessor {
}
@Override
- protected Processor createErrorHandler(RouteContext routeContext, ErrorHandlerFactory builder, Exchange exchange, Processor processor) throws Exception {
- // in case its a reference to another builder then we want the real builder
- final ErrorHandlerFactory ehBuilder = builder.getOrLookupErrorHandlerFactory(routeContext);
-
- Processor answer = super.createErrorHandler(routeContext, ehBuilder, exchange, processor);
+ protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
+ Processor answer = super.createErrorHandler(routeContext, exchange, processor);
exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
@Override
public void onDone(Exchange exchange) {
// remove error handler builder from route context as we are done with the recipient list
// and we cannot reuse this and must remove it to avoid leaking the error handler on the route context
- routeContext.removeErrorHandlers(ehBuilder);
+ routeContext.removeErrorHandlers(routeContext.getErrorHandlerFactory());
}
});
return answer;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/AdviceWithRouteBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/AdviceWithRouteBuilder.java
index c9fe6a5..074e796 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/AdviceWithRouteBuilder.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/AdviceWithRouteBuilder.java
@@ -88,6 +88,20 @@ public abstract class AdviceWithRouteBuilder extends RouteBuilder {
* @throws Exception can be thrown from the route builder
*/
public static RouteDefinition adviceWith(CamelContext camelContext, Object routeId, ThrowingConsumer<AdviceWithRouteBuilder, Exception> builder) throws Exception {
+ RouteDefinition rd = findRouteDefinition(camelContext, routeId);
+
+ return RouteReifier.adviceWith(rd, camelContext, new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ if (builder instanceof AdviceWithRouteBuilder) {
+ setLogRouteAsXml(((AdviceWithRouteBuilder) builder).isLogRouteAsXml());
+ }
+ builder.accept(this);
+ }
+ });
+ }
+
+ protected static RouteDefinition findRouteDefinition(CamelContext camelContext, Object routeId) {
ModelCamelContext mcc = camelContext.adapt(ModelCamelContext.class);
if (mcc.getRouteDefinitions().isEmpty()) {
throw new IllegalArgumentException("Cannot advice route as there are no routes");
@@ -115,16 +129,7 @@ public abstract class AdviceWithRouteBuilder extends RouteBuilder {
rd = mcc.getRouteDefinitions().get(0);
}
}
-
- return RouteReifier.adviceWith(rd, camelContext, new AdviceWithRouteBuilder() {
- @Override
- public void configure() throws Exception {
- if (builder instanceof AdviceWithRouteBuilder) {
- setLogRouteAsXml(((AdviceWithRouteBuilder) builder).isLogRouteAsXml());
- }
- builder.accept(this);
- }
- });
+ return rd;
}
/**
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
index 3fdd549..108ba2b 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
@@ -16,18 +16,14 @@
*/
package org.apache.camel.builder;
-import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.ExchangePattern;
import org.apache.camel.LoggingLevel;
-import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.processor.FatalFallbackErrorHandler;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.errorhandler.DeadLetterChannel;
import org.apache.camel.spi.CamelLogger;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.StringHelper;
import org.slf4j.LoggerFactory;
/**
@@ -52,25 +48,6 @@ public class DeadLetterChannelBuilder extends DefaultErrorHandlerBuilder {
}
@Override
- public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
- validateDeadLetterUri(routeContext);
-
- CamelContext camelContext = routeContext.getCamelContext();
- DeadLetterChannel answer = new DeadLetterChannel(camelContext, processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(),
- getExceptionPolicyStrategy(), getFailureProcessor(), getDeadLetterUri(), isDeadLetterHandleNewException(),
- isUseOriginalMessage(), isUseOriginalBody(), getRetryWhilePolicy(camelContext),
- getExecutorService(camelContext), getOnPrepareFailure(), getOnExceptionOccurred());
- // configure error handler before we can use it
- configure(routeContext, answer);
- return answer;
- }
-
- @Override
- public boolean supportTransacted() {
- return false;
- }
-
- @Override
public ErrorHandlerBuilder cloneBuilder() {
DeadLetterChannelBuilder answer = new DeadLetterChannelBuilder();
super.cloneBuilder(answer);
@@ -93,16 +70,6 @@ public class DeadLetterChannelBuilder extends DefaultErrorHandlerBuilder {
return failureProcessor;
}
- protected void validateDeadLetterUri(RouteContext routeContext) {
- if (deadLetter == null) {
- StringHelper.notEmpty(deadLetterUri, "deadLetterUri", this);
- deadLetter = routeContext.getCamelContext().getEndpoint(deadLetterUri);
- if (deadLetter == null) {
- throw new NoSuchEndpointException(deadLetterUri);
- }
- }
- }
-
@Override
protected CamelLogger createLogger() {
return new CamelLogger(LoggerFactory.getLogger(DeadLetterChannel.class), LoggingLevel.ERROR);
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
index 67f5171..f893756 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
@@ -27,10 +27,7 @@ import org.apache.camel.Processor;
import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
import org.apache.camel.processor.errorhandler.RedeliveryPolicy;
import org.apache.camel.spi.CamelLogger;
-import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.Language;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.ExpressionToPredicateAdapter;
import org.slf4j.LoggerFactory;
@@ -60,16 +57,6 @@ public class DefaultErrorHandlerBuilder extends ErrorHandlerBuilderSupport {
}
@Override
- public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
- DefaultErrorHandler answer = new DefaultErrorHandler(routeContext.getCamelContext(), processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(),
- getExceptionPolicyStrategy(), getRetryWhilePolicy(routeContext.getCamelContext()),
- getExecutorService(routeContext.getCamelContext()), getOnPrepareFailure(), getOnExceptionOccurred());
- // configure error handler before we can use it
- configure(routeContext, answer);
- return answer;
- }
-
- @Override
public boolean supportTransacted() {
return false;
}
@@ -663,31 +650,6 @@ public class DefaultErrorHandlerBuilder extends ErrorHandlerBuilderSupport {
return new CamelLogger(LoggerFactory.getLogger(DefaultErrorHandler.class), LoggingLevel.ERROR);
}
- protected synchronized ScheduledExecutorService getExecutorService(CamelContext camelContext) {
- if (executorService == null || executorService.isShutdown()) {
- // camel context will shutdown the executor when it shutdown so no
- // need to shut it down when stopping
- if (executorServiceRef != null) {
- executorService = camelContext.getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class);
- if (executorService == null) {
- ExecutorServiceManager manager = camelContext.getExecutorServiceManager();
- ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef);
- executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile);
- }
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- } else {
- // no explicit configured thread pool, so leave it up to the
- // error handler to decide if it need
- // a default thread pool from
- // CamelContext#getErrorHandlerExecutorService
- executorService = null;
- }
- }
- return executorService;
- }
-
@Override
public String toString() {
return "DefaultErrorHandlerBuilder";
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java
index f4c2eaf..0a7a6bc 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java
@@ -16,12 +16,6 @@
*/
package org.apache.camel.builder;
-import org.apache.camel.ErrorHandlerFactory;
-import org.apache.camel.Processor;
-import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.ObjectHelper;
-
/**
* Represents a proxy to an error handler builder which is resolved by named
* reference
@@ -35,17 +29,6 @@ public class ErrorHandlerBuilderRef extends ErrorHandlerBuilderSupport {
}
@Override
- public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
- ErrorHandlerFactory handler = lookupErrorHandler(routeContext);
- return ErrorHandlerReifier.reifier(routeContext, handler).createErrorHandler(processor);
- }
-
- @Override
- public ErrorHandlerFactory getOrLookupErrorHandlerFactory(RouteContext routeContext) {
- return lookupErrorHandler(routeContext);
- }
-
- @Override
public boolean supportTransacted() {
return supportTransacted;
}
@@ -69,18 +52,6 @@ public class ErrorHandlerBuilderRef extends ErrorHandlerBuilderSupport {
return ref;
}
- private ErrorHandlerBuilder lookupErrorHandler(RouteContext routeContext) {
- ErrorHandlerBuilder handler = (ErrorHandlerBuilder)ErrorHandlerReifier.lookupErrorHandlerFactory(routeContext, getRef());
- ObjectHelper.notNull(handler, "error handler '" + ref + "'");
-
- // configure if the handler support transacted
- supportTransacted = handler.supportTransacted();
-
- routeContext.addErrorHandlerFactoryReference(this, handler);
-
- return handler;
- }
-
@Override
public String toString() {
return "ErrorHandlerBuilderRef[" + ref + "]";
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java
index d5b5ae8..66a042b 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java
@@ -16,27 +16,8 @@
*/
package org.apache.camel.builder;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
import org.apache.camel.ErrorHandlerFactory;
-import org.apache.camel.NamedNode;
-import org.apache.camel.Predicate;
-import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.model.OnExceptionDefinition;
-import org.apache.camel.model.ProcessorDefinitionHelper;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.processor.ErrorHandler;
-import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
-import org.apache.camel.processor.errorhandler.ExceptionPolicy;
-import org.apache.camel.processor.errorhandler.ExceptionPolicyKey;
import org.apache.camel.processor.errorhandler.ExceptionPolicyStrategy;
-import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler;
-import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
-import org.apache.camel.spi.ClassResolver;
-import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
/**
@@ -45,89 +26,13 @@ import org.apache.camel.util.ObjectHelper;
public abstract class ErrorHandlerBuilderSupport implements ErrorHandlerBuilder {
private ExceptionPolicyStrategy exceptionPolicyStrategy;
- protected void cloneBuilder(ErrorHandlerBuilderSupport other) {
- other.exceptionPolicyStrategy = exceptionPolicyStrategy;
- }
-
@Override
- public ErrorHandlerFactory getOrLookupErrorHandlerFactory(RouteContext routeContext) {
- // only ErrorHandlerRef should override to lookup
- return this;
+ public boolean supportTransacted() {
+ return false;
}
- /**
- * Configures the other error handler based on this error handler.
- *
- * @param routeContext the route context
- * @param handler the other error handler
- */
- public void configure(RouteContext routeContext, ErrorHandler handler) {
- if (handler instanceof ErrorHandlerSupport) {
- ErrorHandlerSupport handlerSupport = (ErrorHandlerSupport)handler;
-
- Set<NamedNode> list = routeContext.getErrorHandlers(this);
- for (NamedNode exception : list) {
- addExceptionPolicy(handlerSupport, routeContext, (OnExceptionDefinition)exception);
- }
- }
- if (handler instanceof RedeliveryErrorHandler) {
- RedeliveryErrorHandler reh = (RedeliveryErrorHandler)handler;
- boolean original = reh.isUseOriginalMessagePolicy() || reh.isUseOriginalBodyPolicy();
- if (original) {
- if (reh.isUseOriginalMessagePolicy() && reh.isUseOriginalBodyPolicy()) {
- throw new IllegalArgumentException("Cannot set both useOriginalMessage and useOriginalBody on the error handler");
- }
- // ensure allow original is turned on
- routeContext.setAllowUseOriginalMessage(true);
- }
- }
- }
-
- public static void addExceptionPolicy(ErrorHandlerSupport handlerSupport, RouteContext routeContext, OnExceptionDefinition exceptionType) {
- if (routeContext != null) {
- // add error handler as child service so they get lifecycle handled
- Processor errorHandler = routeContext.getOnException(exceptionType.getId());
- handlerSupport.addErrorHandler(errorHandler);
-
- // load exception classes
- List<Class<? extends Throwable>> list;
- if (ObjectHelper.isNotEmpty(exceptionType.getExceptions())) {
- list = createExceptionClasses(exceptionType, routeContext.getCamelContext().getClassResolver());
- for (Class<? extends Throwable> clazz : list) {
- String routeId = null;
- // only get the route id, if the exception type is route
- // scoped
- if (exceptionType.isRouteScoped()) {
- RouteDefinition route = ProcessorDefinitionHelper.getRoute(exceptionType);
- if (route != null) {
- routeId = route.getId();
- }
- }
- Predicate when = exceptionType.getOnWhen() != null ? exceptionType.getOnWhen().getExpression() : null;
- ExceptionPolicyKey key = new ExceptionPolicyKey(routeId, clazz, when);
- ExceptionPolicy policy = toExceptionPolicy(exceptionType, routeContext);
- handlerSupport.addExceptionPolicy(key, policy);
- }
- }
- }
- }
-
- protected static ExceptionPolicy toExceptionPolicy(OnExceptionDefinition exceptionType, RouteContext routeContext) {
- return ErrorHandlerReifier.createExceptionPolicy(exceptionType, routeContext.getCamelContext());
- }
-
- protected static List<Class<? extends Throwable>> createExceptionClasses(OnExceptionDefinition exceptionType, ClassResolver resolver) {
- List<String> list = exceptionType.getExceptions();
- List<Class<? extends Throwable>> answer = new ArrayList<>(list.size());
- for (String name : list) {
- try {
- Class<? extends Throwable> type = resolver.resolveMandatoryClass(name, Throwable.class);
- answer.add(type);
- } catch (ClassNotFoundException e) {
- throw RuntimeCamelException.wrapRuntimeCamelException(e);
- }
- }
- return answer;
+ protected void cloneBuilder(ErrorHandlerBuilderSupport other) {
+ other.exceptionPolicyStrategy = exceptionPolicyStrategy;
}
/**
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java
index 3565d35..85aa900 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java
@@ -16,12 +16,7 @@
*/
package org.apache.camel.builder;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.support.processor.DelegateAsyncProcessor;
/**
* A builder to disable the use of an error handler so that any exceptions are
@@ -34,31 +29,6 @@ import org.apache.camel.support.processor.DelegateAsyncProcessor;
public class NoErrorHandlerBuilder extends ErrorHandlerBuilderSupport {
@Override
- public Processor createErrorHandler(RouteContext routeContext, Processor processor) {
- return new DelegateAsyncProcessor(processor) {
- @Override
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
- return super.process(exchange, new AsyncCallback() {
- @Override
- public void done(boolean doneSync) {
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
- callback.done(doneSync);
- }
- });
- }
-
- @Override
- public String toString() {
- if (processor == null) {
- // if no output then dont do any description
- return "";
- }
- return "NoErrorHandler[" + processor + "]";
- }
- };
- }
-
- @Override
public boolean supportTransacted() {
return false;
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
index ebcfe43..1593bb3 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
@@ -31,7 +31,6 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.FailedToStartRouteException;
import org.apache.camel.Route;
import org.apache.camel.impl.engine.AbstractCamelContext;
-import org.apache.camel.impl.engine.DefaultRouteContext;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.Model;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
similarity index 60%
copy from core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java
copy to core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
index c7c466c..821f74a 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
@@ -14,22 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.reifier.errorhandler;
+package org.apache.camel.impl;
-import org.apache.camel.ErrorHandlerFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.NamedNode;
import org.apache.camel.Processor;
-import org.apache.camel.builder.ErrorHandlerBuilderRef;
+import org.apache.camel.impl.engine.AbstractRouteContext;
+import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
import org.apache.camel.spi.RouteContext;
-public class ErrorHandlerRefReifier extends ErrorHandlerReifier<ErrorHandlerBuilderRef> {
+/**
+ * The context used to activate new routing rules
+ */
+public class DefaultRouteContext extends AbstractRouteContext implements RouteContext {
- public ErrorHandlerRefReifier(RouteContext routeContext, ErrorHandlerFactory definition) {
- super(routeContext, (ErrorHandlerBuilderRef)definition);
+ public DefaultRouteContext(CamelContext camelContext, NamedNode route, String routeId) {
+ super(camelContext, route, routeId);
}
@Override
public Processor createErrorHandler(Processor processor) throws Exception {
- return definition.createErrorHandler(routeContext, processor);
+ return ErrorHandlerReifier.reifier(this, getErrorHandlerFactory())
+ .createErrorHandler(processor);
}
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DynamicRouterReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DynamicRouterReifier.java
index bbf1f22..b84af32 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DynamicRouterReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DynamicRouterReifier.java
@@ -17,13 +17,11 @@
package org.apache.camel.reifier;
import org.apache.camel.AsyncProcessor;
-import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.model.DynamicRouterDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.DynamicRouter;
-import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
import org.apache.camel.spi.RouteContext;
public class DynamicRouterReifier extends ExpressionReifier<DynamicRouterDefinition<?>> {
@@ -45,12 +43,10 @@ public class DynamicRouterReifier extends ExpressionReifier<DynamicRouterDefinit
dynamicRouter.setCacheSize(parseInt(definition.getCacheSize()));
}
- // and wrap this in an error handler
- ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory();
// create error handler (create error handler directly to keep it light
// weight,
// instead of using ProcessorReifier.wrapInErrorHandler)
- AsyncProcessor errorHandler = (AsyncProcessor)ErrorHandlerReifier.reifier(routeContext, builder).createErrorHandler(dynamicRouter.newRoutingSlipProcessorForErrorHandler());
+ AsyncProcessor errorHandler = (AsyncProcessor) routeContext.createErrorHandler(dynamicRouter.newRoutingSlipProcessorForErrorHandler());
dynamicRouter.setErrorHandler(errorHandler);
return dynamicRouter;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index c693938..8012339 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -661,7 +661,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
if (inheritErrorHandler == null || inheritErrorHandler) {
log.trace("{} is configured to inheritErrorHandler", definition);
Processor output = channel.getOutput();
- Processor errorHandler = wrapInErrorHandler(output);
+ Processor errorHandler = wrapInErrorHandler(output, true);
// set error handler on channel
channel.setErrorHandler(errorHandler);
} else {
@@ -673,17 +673,21 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
* Wraps the given output in an error handler
*
* @param output the output
+ * @param longLived if the processor is longLived or not
* @return the output wrapped with the error handler
* @throws Exception can be thrown if failed to create error handler builder
*/
- protected Processor wrapInErrorHandler(Processor output) throws Exception {
+ protected Processor wrapInErrorHandler(Processor output, boolean longLived) throws Exception {
ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory();
+
// create error handler
Processor errorHandler = ErrorHandlerReifier.reifier(routeContext, builder).createErrorHandler(output);
- // invoke lifecycles so we can manage this error handler builder
- for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
- strategy.onErrorHandlerAdd(routeContext, errorHandler, builder);
+ if (longLived) {
+ // invoke lifecycles so we can manage this error handler builder
+ for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
+ strategy.onErrorHandlerAdd(routeContext, errorHandler, builder);
+ }
}
return errorHandler;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
index 81aac98..9ad47d8 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
@@ -99,7 +99,7 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti
// special error handling
// when sending to the recipients individually
Processor evalProcessor = new EvaluateExpressionProcessor(expression);
- evalProcessor = super.wrapInErrorHandler(evalProcessor);
+ evalProcessor = wrapInErrorHandler(evalProcessor, true);
pipe.add(evalProcessor);
pipe.add(answer);
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java
index 7b5b098..c403cc1 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -24,7 +24,6 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.FailedToCreateRouteException;
-import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RoutingSlipReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RoutingSlipReifier.java
index 29db4da..611392d 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RoutingSlipReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RoutingSlipReifier.java
@@ -17,13 +17,11 @@
package org.apache.camel.reifier;
import org.apache.camel.AsyncProcessor;
-import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RoutingSlipDefinition;
import org.apache.camel.processor.RoutingSlip;
-import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
import org.apache.camel.spi.RouteContext;
import static org.apache.camel.model.RoutingSlipDefinition.DEFAULT_DELIMITER;
@@ -51,11 +49,8 @@ public class RoutingSlipReifier extends ExpressionReifier<RoutingSlipDefinition<
}
// and wrap this in an error handler
- ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory();
- // create error handler (create error handler directly to keep it light
- // weight,
- // instead of using ProcessorDefinition.wrapInErrorHandler)
- AsyncProcessor errorHandler = (AsyncProcessor)ErrorHandlerReifier.reifier(routeContext, builder).createErrorHandler(routingSlip.newRoutingSlipProcessorForErrorHandler());
+ AsyncProcessor processor = routingSlip.newRoutingSlipProcessorForErrorHandler();
+ AsyncProcessor errorHandler = (AsyncProcessor) wrapInErrorHandler(processor, false);
routingSlip.setErrorHandler(errorHandler);
return routingSlip;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 9a69586..063b543 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -49,7 +49,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
SendDynamicProcessor dynamicTo = (SendDynamicProcessor)super.createProcessor();
// create error handler we need to use for processing the wire tapped
- Processor target = wrapInErrorHandler(dynamicTo);
+ Processor target = wrapInErrorHandler(dynamicTo, true);
// and wrap in unit of work
CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, target);
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java
index c7c466c..df793fb 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java
@@ -20,6 +20,7 @@ import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Processor;
import org.apache.camel.builder.ErrorHandlerBuilderRef;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
public class ErrorHandlerRefReifier extends ErrorHandlerReifier<ErrorHandlerBuilderRef> {
@@ -29,7 +30,16 @@ public class ErrorHandlerRefReifier extends ErrorHandlerReifier<ErrorHandlerBuil
@Override
public Processor createErrorHandler(Processor processor) throws Exception {
- return definition.createErrorHandler(routeContext, processor);
+ ErrorHandlerFactory handler = lookupErrorHandler(routeContext);
+ return ErrorHandlerReifier.reifier(routeContext, handler).createErrorHandler(processor);
+ }
+
+ private ErrorHandlerFactory lookupErrorHandler(RouteContext routeContext) {
+ ErrorHandlerFactory handler =
+ ErrorHandlerReifier.lookupErrorHandlerFactory(routeContext, definition.getRef());
+ ObjectHelper.notNull(handler, "error handler '" + definition.getRef() + "'");
+ routeContext.addErrorHandlerFactoryReference(definition, handler);
+ return handler;
}
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
index c769767..f58379c 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.reifier.errorhandler;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
@@ -35,16 +37,17 @@ import org.apache.camel.builder.ErrorHandlerBuilderRef;
import org.apache.camel.builder.ErrorHandlerBuilderSupport;
import org.apache.camel.builder.NoErrorHandlerBuilder;
import org.apache.camel.model.OnExceptionDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
import org.apache.camel.model.RedeliveryPolicyDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.processor.ErrorHandler;
import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
import org.apache.camel.processor.errorhandler.ExceptionPolicy;
import org.apache.camel.processor.errorhandler.ExceptionPolicy.RedeliveryOption;
+import org.apache.camel.processor.errorhandler.ExceptionPolicyKey;
import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler;
import org.apache.camel.processor.errorhandler.RedeliveryPolicy;
import org.apache.camel.reifier.AbstractReifier;
-import org.apache.camel.reifier.language.ExpressionReifier;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.util.ObjectHelper;
@@ -80,44 +83,34 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport>
BiFunction<RouteContext, ErrorHandlerFactory, ErrorHandlerReifier<? extends ErrorHandlerFactory>> reifier = ERROR_HANDLERS.get(definition.getClass());
if (reifier != null) {
return reifier.apply(routeContext, definition);
- } else if (definition instanceof ErrorHandlerBuilderSupport) {
- return new ErrorHandlerReifier<ErrorHandlerBuilderSupport>(routeContext, (ErrorHandlerBuilderSupport)definition) {
- @Override
- public Processor createErrorHandler(Processor processor) throws Exception {
- return definition.createErrorHandler(routeContext, processor);
- }
- };
- } else {
- throw new IllegalStateException("Unsupported definition: " + definition);
}
+ throw new IllegalStateException("Unsupported definition: " + definition);
}
- public static ExceptionPolicy createExceptionPolicy(OnExceptionDefinition def, CamelContext camelContext) {
+ public ExceptionPolicy createExceptionPolicy(OnExceptionDefinition def) {
Predicate handled = def.getHandledPolicy();
if (handled == null && def.getHandled() != null) {
- handled = ExpressionReifier.reifier(camelContext, def.getHandled()).createPredicate();
+ handled = createPredicate(def.getHandled());
}
Predicate continued = def.getContinuedPolicy();
if (continued == null && def.getContinued() != null) {
- continued = ExpressionReifier.reifier(camelContext, def.getContinued()).createPredicate();
+ continued = createPredicate(def.getContinued());
}
Predicate retryWhile = def.getRetryWhilePolicy();
if (retryWhile == null && def.getRetryWhile() != null) {
- retryWhile = ExpressionReifier.reifier(camelContext, def.getRetryWhile()).createPredicate();
+ retryWhile = createPredicate(def.getRetryWhile());
}
Processor onRedelivery = def.getOnRedelivery();
if (onRedelivery == null && def.getOnRedeliveryRef() != null) {
- onRedelivery = CamelContextHelper.mandatoryLookup(camelContext,
- CamelContextHelper.parseText(camelContext, def.getOnRedeliveryRef()), Processor.class);
+ onRedelivery = mandatoryLookup(parseString(def.getOnRedeliveryRef()), Processor.class);
}
Processor onExceptionOccurred = def.getOnExceptionOccurred();
if (onExceptionOccurred == null && def.getOnExceptionOccurredRef() != null) {
- onExceptionOccurred = CamelContextHelper.mandatoryLookup(camelContext,
- CamelContextHelper.parseText(camelContext, def.getOnExceptionOccurredRef()), Processor.class);
+ onExceptionOccurred = mandatoryLookup(parseString(def.getOnExceptionOccurredRef()), Processor.class);
}
return new ExceptionPolicy(def.getId(), CamelContextHelper.getRouteId(def),
- def.getUseOriginalMessage() != null && CamelContextHelper.parseBoolean(camelContext, def.getUseOriginalMessage()),
- def.getUseOriginalBody() != null && CamelContextHelper.parseBoolean(camelContext, def.getUseOriginalBody()),
+ parseBoolean(def.getUseOriginalMessage(), false),
+ parseBoolean(def.getUseOriginalBody(), false),
ObjectHelper.isNotEmpty(def.getOutputs()), handled,
continued, retryWhile, onRedelivery,
onExceptionOccurred, def.getRedeliveryPolicyRef(),
@@ -262,6 +255,47 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport>
return !DEFAULT_ERROR_HANDLER_BUILDER.equals(ref);
}
+ public void addExceptionPolicy(ErrorHandlerSupport handlerSupport, OnExceptionDefinition exceptionType) {
+ // add error handler as child service so they get lifecycle handled
+ Processor errorHandler = routeContext.getOnException(exceptionType.getId());
+ handlerSupport.addErrorHandler(errorHandler);
+
+ // load exception classes
+ List<Class<? extends Throwable>> list;
+ if (ObjectHelper.isNotEmpty(exceptionType.getExceptions())) {
+ list = createExceptionClasses(exceptionType);
+ for (Class<? extends Throwable> clazz : list) {
+ String routeId = null;
+ // only get the route id, if the exception type is route
+ // scoped
+ if (exceptionType.isRouteScoped()) {
+ RouteDefinition route = ProcessorDefinitionHelper.getRoute(exceptionType);
+ if (route != null) {
+ routeId = route.getId();
+ }
+ }
+ Predicate when = exceptionType.getOnWhen() != null ? exceptionType.getOnWhen().getExpression() : null;
+ ExceptionPolicyKey key = new ExceptionPolicyKey(routeId, clazz, when);
+ ExceptionPolicy policy = createExceptionPolicy(exceptionType);
+ handlerSupport.addExceptionPolicy(key, policy);
+ }
+ }
+ }
+
+ protected List<Class<? extends Throwable>> createExceptionClasses(OnExceptionDefinition exceptionType) {
+ List<String> list = exceptionType.getExceptions();
+ List<Class<? extends Throwable>> answer = new ArrayList<>(list.size());
+ for (String name : list) {
+ try {
+ Class<? extends Throwable> type = camelContext.getClassResolver().resolveMandatoryClass(name, Throwable.class);
+ answer.add(type);
+ } catch (ClassNotFoundException e) {
+ throw RuntimeCamelException.wrapRuntimeCamelException(e);
+ }
+ }
+ return answer;
+ }
+
/**
* Creates the error handler
*
@@ -276,7 +310,7 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport>
ErrorHandlerSupport handlerSupport = (ErrorHandlerSupport)handler;
for (NamedNode exception : routeContext.getErrorHandlers(definition)) {
- ErrorHandlerBuilderSupport.addExceptionPolicy(handlerSupport, routeContext, (OnExceptionDefinition)exception);
+ addExceptionPolicy(handlerSupport, (OnExceptionDefinition) exception);
}
}
if (handler instanceof RedeliveryErrorHandler) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/ErrorHandlerSupportTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/ErrorHandlerSupportTest.java
index 87922b6..9ecc356 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/ErrorHandlerSupportTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/ErrorHandlerSupportTest.java
@@ -22,9 +22,10 @@ import java.util.List;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.builder.ErrorHandlerBuilderSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.OnExceptionDefinition;
+import org.apache.camel.reifier.errorhandler.DefaultErrorHandlerReifier;
+import org.apache.camel.spi.RouteContext;
import org.junit.Test;
public class ErrorHandlerSupportTest extends ContextTestSupport {
@@ -36,7 +37,7 @@ public class ErrorHandlerSupportTest extends ContextTestSupport {
exceptions.add(ParentException.class);
ErrorHandlerSupport support = new ShuntErrorHandlerSupport();
- ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(exceptions));
+ addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(exceptions));
assertEquals(ChildException.class.getName(), getExceptionPolicyFor(support, new ChildException(), 0));
assertEquals(ParentException.class.getName(), getExceptionPolicyFor(support, new ParentException(), 1));
@@ -49,7 +50,7 @@ public class ErrorHandlerSupportTest extends ContextTestSupport {
exceptions.add(ChildException.class);
ErrorHandlerSupport support = new ShuntErrorHandlerSupport();
- ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(exceptions));
+ addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(exceptions));
assertEquals(ChildException.class.getName(), getExceptionPolicyFor(support, new ChildException(), 1));
assertEquals(ParentException.class.getName(), getExceptionPolicyFor(support, new ParentException(), 0));
@@ -58,8 +59,8 @@ public class ErrorHandlerSupportTest extends ContextTestSupport {
@Test
public void testTwoPolicyChildFirst() {
ErrorHandlerSupport support = new ShuntErrorHandlerSupport();
- ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ChildException.class));
- ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ParentException.class));
+ addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ChildException.class));
+ addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ParentException.class));
assertEquals(ChildException.class.getName(), getExceptionPolicyFor(support, new ChildException(), 0));
assertEquals(ParentException.class.getName(), getExceptionPolicyFor(support, new ParentException(), 0));
@@ -68,13 +69,17 @@ public class ErrorHandlerSupportTest extends ContextTestSupport {
@Test
public void testTwoPolicyChildLast() {
ErrorHandlerSupport support = new ShuntErrorHandlerSupport();
- ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ParentException.class));
- ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ChildException.class));
+ addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ParentException.class));
+ addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ChildException.class));
assertEquals(ChildException.class.getName(), getExceptionPolicyFor(support, new ChildException(), 0));
assertEquals(ParentException.class.getName(), getExceptionPolicyFor(support, new ParentException(), 0));
}
+ private static void addExceptionPolicy(ErrorHandlerSupport handlerSupport, RouteContext routeContext, OnExceptionDefinition exceptionType) {
+ new DefaultErrorHandlerReifier<>(routeContext, null).addExceptionPolicy(handlerSupport, exceptionType);
+ }
+
private static String getExceptionPolicyFor(ErrorHandlerSupport support, Throwable childException, int index) {
return support.getExceptionPolicy(null, childException).getExceptions().get(index);
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java
index 24bd5b1..62ee77c 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java
@@ -24,15 +24,20 @@ import java.net.SocketException;
import java.util.HashMap;
import org.apache.camel.AlreadyStoppedException;
+import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ValidationException;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultRouteContext;
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.processor.errorhandler.DefaultExceptionPolicyStrategy;
import org.apache.camel.processor.errorhandler.ExceptionPolicy;
import org.apache.camel.processor.errorhandler.ExceptionPolicyKey;
+import org.apache.camel.reifier.errorhandler.DefaultErrorHandlerReifier;
import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
+import org.apache.camel.spi.RouteContext;
import org.junit.Assert;
import org.junit.Test;
@@ -48,7 +53,10 @@ public class DefaultExceptionPolicyStrategyTest extends Assert {
private ExceptionPolicy type3;
private ExceptionPolicy exceptionPolicy(Class<? extends Throwable> exceptionClass) {
- return ErrorHandlerReifier.createExceptionPolicy(new OnExceptionDefinition(exceptionClass), null);
+ CamelContext cc = new DefaultCamelContext();
+ RouteContext context = new DefaultRouteContext(cc, null, null);
+ return new DefaultErrorHandlerReifier<>(context, null)
+ .createExceptionPolicy(new OnExceptionDefinition(exceptionClass));
}
private void setupPolicies() {
diff --git a/core/camel-core/src/test/java/org/apache/camel/reifier/DataFormatReifierTest.java b/core/camel-core/src/test/java/org/apache/camel/reifier/DataFormatReifierTest.java
index e98248d..9d7d481 100644
--- a/core/camel-core/src/test/java/org/apache/camel/reifier/DataFormatReifierTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/reifier/DataFormatReifierTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.reifier;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.dataformat.CustomDataFormat;
import org.apache.camel.reifier.dataformat.CustomDataFormatReifier;
import org.apache.camel.reifier.dataformat.DataFormatReifier;
@@ -27,17 +28,18 @@ import static junit.framework.TestCase.fail;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class DataFormatReifierTest {
+
@Test
public void testHandleCustomDataFormat() {
+ DefaultCamelContext context = new DefaultCamelContext();
try {
- DataFormatReifier.reifier(null, new MyDataFormat());
-
+ DataFormatReifier.reifier(context, new MyDataFormat());
fail("Should throw IllegalStateException instead");
} catch (IllegalStateException e) {
}
DataFormatReifier.registerReifier(MyDataFormat.class, CustomDataFormatReifier::new);
- DataFormatReifier.reifier(null, new MyDataFormat());
+ DataFormatReifier.reifier(context, new MyDataFormat());
}
public static class MyDataFormat extends CustomDataFormat {
diff --git a/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java b/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java
index d6a0c86..1deeae8 100644
--- a/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.camel.reifier;
-import org.apache.camel.impl.engine.DefaultRouteContext;
+import org.apache.camel.impl.DefaultRouteContext;
import org.apache.camel.model.ProcessDefinition;
import org.apache.camel.spi.RouteContext;
import org.junit.Test;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
index 20589ee..75bb00b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
@@ -27,7 +27,6 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NamedNode;
import org.apache.camel.NoSuchBeanException;
import org.apache.camel.NoSuchEndpointException;
-import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.RouteStartupOrder;