You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2019/05/21 08:47:27 UTC
[camel] 05/08: [CAMEL-13371] Move DefaultRouteContext and some
other classes to camel-base
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit b51384dd1f7d4897cf52c709309cad413d29bb5c
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Mon May 20 22:35:59 2019 +0200
[CAMEL-13371] Move DefaultRouteContext and some other classes to camel-base
---
.../blueprint/handler/CamelNamespaceHandler.java | 2 +-
.../camel/cdi/CdiCamelBeanPostProcessor.java | 4 +-
.../camel/spring/CamelBeanPostProcessor.java | 4 +-
.../apache/camel/spring/EndpointReferenceTest.java | 5 +-
.../config/CustomExecutorServiceManager.java | 4 +-
.../apache/camel/test/junit4/CamelTestSupport.java | 2 +-
.../camel/spi}/CamelInternalProcessorAdvice.java | 52 +-
.../java/org/apache/camel/spi/RouteContext.java | 8 +
.../impl/engine/BaseExecutorServiceManager.java} | 29 +-
.../impl/engine}/CamelPostProcessorHelper.java | 6 +-
.../engine}/DefaultCamelBeanPostProcessor.java | 2 +-
.../camel/impl/engine}/DefaultRouteContext.java | 109 ++---
.../camel/processor/CamelInternalProcessor.java | 53 ++-
.../org/apache/camel/processor/ContractAdvice.java | 1 +
.../apache/camel/processor/RestBindingAdvice.java | 1 +
.../processor/SharedCamelInternalProcessor.java | 1 +
.../org/apache/camel/impl/DefaultCamelContext.java | 2 +-
.../camel/impl/DefaultExecutorServiceManager.java | 521 +--------------------
.../java/org/apache/camel/impl/DefaultModel.java | 4 +-
.../camel/processor/channel/DefaultChannel.java | 4 +-
.../org/apache/camel/reifier/RouteReifier.java | 77 +++
...melPostProcessorHelperConsumePredicateTest.java | 2 +-
...amelPostProcessorHelperConsumePropertyTest.java | 2 +-
...ostProcessorHelperSedaConsumePredicateTest.java | 2 +-
.../{ => engine}/CamelPostProcessorHelperTest.java | 4 +-
.../{ => engine}/CustomThreadPoolFactoryTest.java | 4 +-
26 files changed, 225 insertions(+), 680 deletions(-)
diff --git a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
index 0aa371d..568395a 100644
--- a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
+++ b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
@@ -63,7 +63,7 @@ import org.apache.camel.blueprint.CamelEndpointFactoryBean;
import org.apache.camel.blueprint.CamelRestContextFactoryBean;
import org.apache.camel.blueprint.CamelRouteContextFactoryBean;
import org.apache.camel.core.xml.AbstractCamelFactoryBean;
-import org.apache.camel.impl.CamelPostProcessorHelper;
+import org.apache.camel.impl.engine.CamelPostProcessorHelper;
import org.apache.camel.impl.engine.DefaultCamelContextNameStrategy;
import org.apache.camel.model.AggregateDefinition;
import org.apache.camel.model.CatchDefinition;
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelBeanPostProcessor.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelBeanPostProcessor.java
index e2d42ac..4d33020 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelBeanPostProcessor.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelBeanPostProcessor.java
@@ -28,8 +28,8 @@ import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.PropertyInject;
-import org.apache.camel.impl.CamelPostProcessorHelper;
-import org.apache.camel.impl.DefaultCamelBeanPostProcessor;
+import org.apache.camel.impl.engine.CamelPostProcessorHelper;
+import org.apache.camel.impl.engine.DefaultCamelBeanPostProcessor;
import org.apache.camel.util.ReflectionHelper;
import static org.apache.camel.cdi.BeanManagerHelper.getReferenceByType;
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelBeanPostProcessor.java b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelBeanPostProcessor.java
index b9d610d..c817b7b 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelBeanPostProcessor.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelBeanPostProcessor.java
@@ -29,8 +29,8 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Service;
import org.apache.camel.core.xml.CamelJMXAgentDefinition;
-import org.apache.camel.impl.CamelPostProcessorHelper;
-import org.apache.camel.impl.DefaultCamelBeanPostProcessor;
+import org.apache.camel.impl.engine.CamelPostProcessorHelper;
+import org.apache.camel.impl.engine.DefaultCamelBeanPostProcessor;
import org.apache.camel.spi.Metadata;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java
index d172ef5..eed26b8 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java
@@ -20,7 +20,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultRouteContext;
+import org.apache.camel.impl.engine.DefaultRouteContext;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spring.example.DummyBean;
@@ -74,7 +74,8 @@ public class EndpointReferenceTest extends SpringTestSupport {
@Test
public void testReferenceEndpointFromOtherCamelContext() throws Exception {
CamelContext context = applicationContext.getBean("camel2", CamelContext.class);
- RouteContext routeContext = new DefaultRouteContext(context, new RouteDefinition("temporary"));
+ RouteDefinition route = new RouteDefinition("temporary");
+ RouteContext routeContext = new DefaultRouteContext(context, route, route.idOrCreate(context.getNodeIdFactory()));
try {
routeContext.resolveEndpoint(null, "endpoint1");
fail("Should have thrown exception");
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java b/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java
index e46de53..9169103 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java
@@ -17,9 +17,9 @@
package org.apache.camel.spring.config;
import org.apache.camel.CamelContext;
-import org.apache.camel.impl.DefaultExecutorServiceManager;
+import org.apache.camel.impl.engine.BaseExecutorServiceManager;
-public class CustomExecutorServiceManager extends DefaultExecutorServiceManager {
+public class CustomExecutorServiceManager extends BaseExecutorServiceManager {
public CustomExecutorServiceManager(CamelContext camelContext) {
super(camelContext);
diff --git a/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java b/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java
index d74f0ab..dede8ed 100644
--- a/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java
+++ b/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java
@@ -71,7 +71,7 @@ import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.properties.PropertiesComponent;
-import org.apache.camel.impl.DefaultCamelBeanPostProcessor;
+import org.apache.camel.impl.engine.DefaultCamelBeanPostProcessor;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.impl.engine.InterceptSendToMockEndpointStrategy;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessorAdvice.java b/core/camel-api/src/main/java/org/apache/camel/spi/CamelInternalProcessorAdvice.java
similarity index 52%
rename from core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessorAdvice.java
rename to core/camel-api/src/main/java/org/apache/camel/spi/CamelInternalProcessorAdvice.java
index 03ecbee..85728ee 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessorAdvice.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/CamelInternalProcessorAdvice.java
@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.processor;
+package org.apache.camel.spi;
import org.apache.camel.Exchange;
-import org.apache.camel.Ordered;
-import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
/**
* An advice (before and after) to execute cross cutting functionality in the Camel routing engine.
@@ -49,52 +47,4 @@ public interface CamelInternalProcessorAdvice<T> {
*/
void after(Exchange exchange, T data) throws Exception;
- /**
- * Wrap an InstrumentationProcessor into a CamelInternalProcessorAdvice
- */
- static <T> CamelInternalProcessorAdvice<T> wrap(InstrumentationProcessor<T> instrumentationProcessor) {
-
- if (instrumentationProcessor instanceof CamelInternalProcessor) {
- return (CamelInternalProcessorAdvice<T>) instrumentationProcessor;
- } else {
- return new CamelInternalProcessorAdviceWrapper<T>(instrumentationProcessor);
- }
- }
-
- static Object unwrap(CamelInternalProcessorAdvice<?> advice) {
- if (advice instanceof CamelInternalProcessorAdviceWrapper) {
- return ((CamelInternalProcessorAdviceWrapper) advice).unwrap();
- } else {
- return advice;
- }
- }
-
- class CamelInternalProcessorAdviceWrapper<T> implements CamelInternalProcessorAdvice<T>, Ordered {
-
- final InstrumentationProcessor<T> instrumentationProcessor;
-
- public CamelInternalProcessorAdviceWrapper(InstrumentationProcessor<T> instrumentationProcessor) {
- this.instrumentationProcessor = instrumentationProcessor;
- }
-
- InstrumentationProcessor<T> unwrap() {
- return instrumentationProcessor;
- }
-
- @Override
- public int getOrder() {
- return instrumentationProcessor.getOrder();
- }
-
- @Override
- public T before(Exchange exchange) throws Exception {
- return instrumentationProcessor.before(exchange);
- }
-
- @Override
- public void after(Exchange exchange, T data) throws Exception {
- instrumentationProcessor.after(exchange, data);
- }
- }
-
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java
index f931785..d67feef 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java
@@ -197,10 +197,18 @@ public interface RouteContext extends RuntimeConfiguration, EndpointAware {
*/
Boolean isAutoStartup();
+ void setStartupOrder(Integer startupOrder);
+
Integer getStartupOrder();
+ void setErrorHandlerFactory(ErrorHandlerFactory errorHandlerFactory);
+
ErrorHandlerFactory getErrorHandlerFactory();
+ void addAdvice(CamelInternalProcessorAdvice<?> advice);
+
+ void addProperty(String key, Object value);
+
/**
* Gets the last error.
*
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
similarity index 95%
copy from core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
copy to core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
index 532fd68..67c4e56 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -30,17 +30,13 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NamedNode;
import org.apache.camel.StaticService;
-import org.apache.camel.model.OptionalIdentifiedDefinition;
-import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.model.ProcessorDefinitionHelper;
-import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.ThreadPoolFactory;
import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.DefaultThreadPoolFactory;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
@@ -59,8 +55,8 @@ import org.slf4j.LoggerFactory;
* Default {@link org.apache.camel.spi.ExecutorServiceManager}.
*
*/
-public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class);
+public class BaseExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseExecutorServiceManager.class);
private final CamelContext camelContext;
private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory();
@@ -71,7 +67,7 @@ public class DefaultExecutorServiceManager extends ServiceSupport implements Exe
private final Map<String, ThreadPoolProfile> threadPoolProfiles = new ConcurrentHashMap<>();
private ThreadPoolProfile defaultProfile;
- public DefaultExecutorServiceManager(CamelContext camelContext) {
+ public BaseExecutorServiceManager(CamelContext camelContext) {
this.camelContext = camelContext;
defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId);
@@ -87,6 +83,10 @@ public class DefaultExecutorServiceManager extends ServiceSupport implements Exe
registerThreadPoolProfile(defaultProfile);
}
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
@Override
public ThreadPoolFactory getThreadPoolFactory() {
return threadPoolFactory;
@@ -501,7 +501,7 @@ public class DefaultExecutorServiceManager extends ServiceSupport implements Exe
// extract id from source
if (source instanceof NamedNode) {
- id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory());
+ id = ((NamedNode) source).getId();
// and let source be the short name of the pattern
sourceId = ((NamedNode) source).getShortName();
} else if (source instanceof String) {
@@ -523,11 +523,8 @@ public class DefaultExecutorServiceManager extends ServiceSupport implements Exe
StringHelper.notEmpty(id, "id for thread pool " + executorService);
// extract route id if possible
- if (source instanceof ProcessorDefinition) {
- RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source);
- if (route != null) {
- routeId = route.idOrCreate(this.camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory());
- }
+ if (source instanceof NamedNode) {
+ routeId = CamelContextHelper.getRouteId((NamedNode) source);
}
// let lifecycle strategy be notified as well which can let it be managed in JMX as well
@@ -547,7 +544,7 @@ public class DefaultExecutorServiceManager extends ServiceSupport implements Exe
onNewExecutorService(executorService);
}
- private ThreadFactory createThreadFactory(String name, boolean isDaemon) {
+ protected ThreadFactory createThreadFactory(String name, boolean isDaemon) {
return new CamelThreadFactory(threadNamePattern, name, isDaemon);
}
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelPostProcessorHelper.java
similarity index 98%
rename from core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
rename to core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelPostProcessorHelper.java
index 569b924..eaa87b4 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/CamelPostProcessorHelper.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.lang.reflect.Method;
import java.util.Set;
@@ -37,10 +37,6 @@ import org.apache.camel.ProducerTemplate;
import org.apache.camel.ProxyInstantiationException;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
-import org.apache.camel.impl.engine.DefaultConsumerTemplate;
-import org.apache.camel.impl.engine.DefaultFluentProducerTemplate;
-import org.apache.camel.impl.engine.DefaultProducerTemplate;
-import org.apache.camel.impl.engine.SubscribeMethodProcessor;
import org.apache.camel.spi.BeanProxyFactory;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.IntrospectionSupport;
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelBeanPostProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultCamelBeanPostProcessor.java
similarity index 99%
rename from core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelBeanPostProcessor.java
rename to core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultCamelBeanPostProcessor.java
index 51a715e..0e7003b 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelBeanPostProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultCamelBeanPostProcessor.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
similarity index 78%
rename from core/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
rename to core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
index a64cd54..e1a943c 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.util.ArrayList;
import java.util.HashMap;
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NamedNode;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
@@ -32,15 +31,9 @@ import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ShutdownRoute;
import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.engine.EventDrivenConsumerRoute;
-import org.apache.camel.model.PropertyDefinition;
-import org.apache.camel.model.RouteDefinition;
import org.apache.camel.processor.CamelInternalProcessor;
-import org.apache.camel.processor.CamelInternalProcessorAdvice;
-import org.apache.camel.processor.ContractAdvice;
+import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.processor.Pipeline;
-import org.apache.camel.reifier.rest.RestBindingReifier;
-import org.apache.camel.spi.Contract;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.ManagementInterceptStrategy;
import org.apache.camel.spi.RouteContext;
@@ -53,7 +46,7 @@ import org.apache.camel.support.CamelContextHelper;
* The context used to activate new routing rules
*/
public class DefaultRouteContext implements RouteContext {
- private final RouteDefinition route;
+ private final NamedNode route;
private final String routeId;
private Route runtimeRoute;
private Endpoint endpoint;
@@ -77,11 +70,15 @@ public class DefaultRouteContext implements RouteContext {
private RouteController routeController;
private final Map<String, Processor> onCompletions = new HashMap<>();
private final Map<String, Processor> onExceptions = new HashMap<>();
+ private final List<CamelInternalProcessorAdvice<?>> advices = new ArrayList<>();
+ private final Map<String, Object> properties = new HashMap<>();
+ private ErrorHandlerFactory errorHandlerFactory;
+ private Integer startupOrder;
- public DefaultRouteContext(CamelContext camelContext, RouteDefinition route) {
+ public DefaultRouteContext(CamelContext camelContext, NamedNode route, String routeId) {
this.camelContext = camelContext;
this.route = route;
- this.routeId = route.idOrCreate(camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory());
+ this.routeId = routeId;
}
public Endpoint getEndpoint() {
@@ -188,79 +185,18 @@ public class DefaultRouteContext implements RouteContext {
// wrap in JMX instrumentation processor that is used for performance stats
if (managementInterceptStrategy != null) {
- internal.addAdvice(CamelInternalProcessorAdvice.wrap(managementInterceptStrategy.createProcessor("route")));
+ internal.addAdvice(CamelInternalProcessor.wrap(managementInterceptStrategy.createProcessor("route")));
}
// wrap in route lifecycle
internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
// wrap in REST binding
- if (route.getRestBindingDefinition() != null) {
- try {
- internal.addAdvice(new RestBindingReifier(route.getRestBindingDefinition()).createRestBindingAdvice(this));
- } catch (Exception e) {
- throw RuntimeCamelException.wrapRuntimeCamelException(e);
- }
- }
-
- // wrap in contract
- if (route.getInputType() != null || route.getOutputType() != null) {
- Contract contract = new Contract();
- if (route.getInputType() != null) {
- contract.setInputType(route.getInputType().getUrn());
- contract.setValidateInput(route.getInputType().isValidate());
- }
- if (route.getOutputType() != null) {
- contract.setOutputType(route.getOutputType().getUrn());
- contract.setValidateOutput(route.getOutputType().isValidate());
- }
- internal.addAdvice(new ContractAdvice(contract));
- // make sure to enable data type as its in use when using input/output types on routes
- camelContext.setUseDataType(true);
- }
+ advices.forEach(internal::addAdvice);
// and create the route that wraps the UoW
Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);
- edcr.getProperties().put(Route.ID_PROPERTY, routeId);
- edcr.getProperties().put(Route.CUSTOM_ID_PROPERTY, route.hasCustomIdAssigned() ? "true" : "false");
- edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode()));
- edcr.getProperties().put(Route.DESCRIPTION_PROPERTY, route.getDescriptionText());
- if (route.getGroup() != null) {
- edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup());
- }
- String rest = "false";
- if (route.isRest() != null && route.isRest()) {
- rest = "true";
- }
- edcr.getProperties().put(Route.REST_PROPERTY, rest);
-
- List<PropertyDefinition> properties = route.getRouteProperties();
- if (properties != null) {
- final String[] reservedProperties = new String[] {
- Route.ID_PROPERTY,
- Route.PARENT_PROPERTY,
- Route.GROUP_PROPERTY,
- Route.REST_PROPERTY,
- Route.DESCRIPTION_PROPERTY
- };
-
- for (PropertyDefinition prop : properties) {
- try {
- final String key = CamelContextHelper.parseText(camelContext, prop.getKey());
- final String val = CamelContextHelper.parseText(camelContext, prop.getValue());
-
- for (String property : reservedProperties) {
- if (property.equalsIgnoreCase(key)) {
- throw new IllegalArgumentException("Cannot set route property " + property + " as it is a reserved property");
- }
- }
-
- edcr.getProperties().put(key, val);
- } catch (Exception e) {
- throw RuntimeCamelException.wrapRuntimeCamelException(e);
- }
- }
- }
+ edcr.getProperties().putAll(properties);
// after the route is created then set the route on the policy processor so we get hold of it
CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class);
@@ -419,14 +355,22 @@ public class DefaultRouteContext implements RouteContext {
return true;
}
+ public void setStartupOrder(Integer startupOrder) {
+ this.startupOrder = startupOrder;
+ }
+
@Override
public Integer getStartupOrder() {
- return route.getStartupOrder();
+ return startupOrder;
+ }
+
+ public void setErrorHandlerFactory(ErrorHandlerFactory errorHandlerFactory) {
+ this.errorHandlerFactory = errorHandlerFactory;
}
@Override
public ErrorHandlerFactory getErrorHandlerFactory() {
- return route.getErrorHandlerFactory();
+ return errorHandlerFactory;
}
public void setShutdownRoute(ShutdownRoute shutdownRoute) {
@@ -512,4 +456,13 @@ public class DefaultRouteContext implements RouteContext {
public void setOnException(String onExceptionId, Processor processor) {
onExceptions.put(onExceptionId, processor);
}
+
+ public void addAdvice(CamelInternalProcessorAdvice<?> advice) {
+ advices.add(advice);
+ }
+
+ public void addProperty(String key, Object value) {
+ properties.put(key, value);
+ }
+
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 519bacd..ba7fd2e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -36,7 +36,9 @@ import org.apache.camel.StreamCache;
import org.apache.camel.processor.interceptor.BacklogDebugger;
import org.apache.camel.processor.interceptor.BacklogTracer;
import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage;
+import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
import org.apache.camel.spi.MessageHistoryFactory;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RoutePolicy;
@@ -110,7 +112,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
*/
public <T> T getAdvice(Class<T> type) {
for (CamelInternalProcessorAdvice task : advices) {
- Object advice = CamelInternalProcessorAdvice.unwrap(task);
+ Object advice = unwrap(task);
if (type.isInstance(advice)) {
return type.cast(advice);
}
@@ -735,4 +737,53 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
// noop
}
}
+
+ /**
+ * Wrap an InstrumentationProcessor into a CamelInternalProcessorAdvice
+ */
+ public static <T> CamelInternalProcessorAdvice<T> wrap(InstrumentationProcessor<T> instrumentationProcessor) {
+
+ if (instrumentationProcessor instanceof CamelInternalProcessor) {
+ return (CamelInternalProcessorAdvice<T>) instrumentationProcessor;
+ } else {
+ return new CamelInternalProcessorAdviceWrapper<T>(instrumentationProcessor);
+ }
+ }
+
+ public static Object unwrap(CamelInternalProcessorAdvice<?> advice) {
+ if (advice instanceof CamelInternalProcessorAdviceWrapper) {
+ return ((CamelInternalProcessorAdviceWrapper) advice).unwrap();
+ } else {
+ return advice;
+ }
+ }
+
+ static class CamelInternalProcessorAdviceWrapper<T> implements CamelInternalProcessorAdvice<T>, Ordered {
+
+ final InstrumentationProcessor<T> instrumentationProcessor;
+
+ public CamelInternalProcessorAdviceWrapper(InstrumentationProcessor<T> instrumentationProcessor) {
+ this.instrumentationProcessor = instrumentationProcessor;
+ }
+
+ InstrumentationProcessor<T> unwrap() {
+ return instrumentationProcessor;
+ }
+
+ @Override
+ public int getOrder() {
+ return instrumentationProcessor.getOrder();
+ }
+
+ @Override
+ public T before(Exchange exchange) throws Exception {
+ return instrumentationProcessor.before(exchange);
+ }
+
+ @Override
+ public void after(Exchange exchange, T data) throws Exception {
+ instrumentationProcessor.after(exchange, data);
+ }
+ }
+
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ContractAdvice.java b/core/camel-base/src/main/java/org/apache/camel/processor/ContractAdvice.java
index 5e11b37..435cee9 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/ContractAdvice.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/ContractAdvice.java
@@ -20,6 +20,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.ValidationException;
+import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.spi.Contract;
import org.apache.camel.spi.DataType;
import org.apache.camel.spi.DataTypeAware;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RestBindingAdvice.java b/core/camel-base/src/main/java/org/apache/camel/processor/RestBindingAdvice.java
index e68d7c3..4a7ee4f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RestBindingAdvice.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RestBindingAdvice.java
@@ -27,6 +27,7 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataType;
import org.apache.camel.spi.DataTypeAware;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index ac698d4..0cf2654 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.Ordered;
import org.apache.camel.Processor;
import org.apache.camel.Service;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.UnitOfWork;
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index dad1661..1f4e396 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -30,6 +30,7 @@ import org.apache.camel.impl.converter.DefaultTypeConverter;
import org.apache.camel.impl.engine.BeanProcessorFactoryResolver;
import org.apache.camel.impl.engine.BeanProxyFactoryResolver;
import org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager;
+import org.apache.camel.impl.engine.DefaultCamelBeanPostProcessor;
import org.apache.camel.impl.engine.DefaultCamelContextNameStrategy;
import org.apache.camel.impl.engine.DefaultClassResolver;
import org.apache.camel.impl.engine.DefaultComponentResolver;
@@ -81,7 +82,6 @@ import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.Registry;
-import org.apache.camel.spi.RestRegistry;
import org.apache.camel.spi.RestRegistryFactory;
import org.apache.camel.spi.RouteController;
import org.apache.camel.spi.ShutdownStrategy;
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
index 532fd68..b9eb1a1 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
@@ -16,539 +16,44 @@
*/
package org.apache.camel.impl;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.NamedNode;
-import org.apache.camel.StaticService;
+import org.apache.camel.impl.engine.BaseExecutorServiceManager;
import org.apache.camel.model.OptionalIdentifiedDefinition;
-import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.model.ProcessorDefinitionHelper;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.spi.ExecutorServiceManager;
-import org.apache.camel.spi.LifecycleStrategy;
-import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.ThreadPoolProfile;
-import org.apache.camel.support.DefaultThreadPoolFactory;
-import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.StopWatch;
-import org.apache.camel.util.StringHelper;
-import org.apache.camel.util.TimeUtils;
-import org.apache.camel.util.URISupport;
-import org.apache.camel.util.concurrent.CamelThreadFactory;
-import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
-import org.apache.camel.util.concurrent.ThreadHelper;
-import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-/**
- * Default {@link org.apache.camel.spi.ExecutorServiceManager}.
- *
- */
-public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class);
-
- private final CamelContext camelContext;
- private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory();
- private final List<ExecutorService> executorServices = new CopyOnWriteArrayList<>();
- private String threadNamePattern;
- private long shutdownAwaitTermination = 10000;
- private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
- private final Map<String, ThreadPoolProfile> threadPoolProfiles = new ConcurrentHashMap<>();
- private ThreadPoolProfile defaultProfile;
+public class DefaultExecutorServiceManager extends BaseExecutorServiceManager {
public DefaultExecutorServiceManager(CamelContext camelContext) {
- this.camelContext = camelContext;
-
- defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId);
- defaultProfile.setDefaultProfile(true);
- defaultProfile.setPoolSize(10);
- defaultProfile.setMaxPoolSize(20);
- defaultProfile.setKeepAliveTime(60L);
- defaultProfile.setTimeUnit(TimeUnit.SECONDS);
- defaultProfile.setMaxQueueSize(1000);
- defaultProfile.setAllowCoreThreadTimeOut(false);
- defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns);
-
- registerThreadPoolProfile(defaultProfile);
- }
-
- @Override
- public ThreadPoolFactory getThreadPoolFactory() {
- return threadPoolFactory;
- }
-
- @Override
- public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
- this.threadPoolFactory = threadPoolFactory;
- }
-
- @Override
- public void registerThreadPoolProfile(ThreadPoolProfile profile) {
- ObjectHelper.notNull(profile, "profile");
- StringHelper.notEmpty(profile.getId(), "id", profile);
- threadPoolProfiles.put(profile.getId(), profile);
- }
-
- @Override
- public ThreadPoolProfile getThreadPoolProfile(String id) {
- return threadPoolProfiles.get(id);
- }
-
- @Override
- public ThreadPoolProfile getDefaultThreadPoolProfile() {
- return getThreadPoolProfile(defaultThreadPoolProfileId);
- }
-
- @Override
- public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
- threadPoolProfiles.remove(defaultThreadPoolProfileId);
- defaultThreadPoolProfile.addDefaults(defaultProfile);
-
- LOG.info("Using custom DefaultThreadPoolProfile: {}", defaultThreadPoolProfile);
-
- this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId();
- defaultThreadPoolProfile.setDefaultProfile(true);
- registerThreadPoolProfile(defaultThreadPoolProfile);
- }
-
- @Override
- public String getThreadNamePattern() {
- return threadNamePattern;
- }
-
- @Override
- public void setThreadNamePattern(String threadNamePattern) {
- // must set camel id here in the pattern and let the other placeholders be resolved on demand
- this.threadNamePattern = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName());
- }
-
- @Override
- public long getShutdownAwaitTermination() {
- return shutdownAwaitTermination;
- }
-
- @Override
- public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
- this.shutdownAwaitTermination = shutdownAwaitTermination;
- }
-
- @Override
- public String resolveThreadName(String name) {
- return ThreadHelper.resolveThreadName(threadNamePattern, name);
- }
-
- @Override
- public Thread newThread(String name, Runnable runnable) {
- ThreadFactory factory = createThreadFactory(name, true);
- return factory.newThread(runnable);
- }
-
- @Override
- public ExecutorService newDefaultThreadPool(Object source, String name) {
- return newThreadPool(source, name, getDefaultThreadPoolProfile());
- }
-
- @Override
- public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) {
- return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile());
- }
-
- @Override
- public ExecutorService newThreadPool(Object source, String name, String profileId) {
- ThreadPoolProfile profile = getThreadPoolProfile(profileId);
- if (profile != null) {
- return newThreadPool(source, name, profile);
- } else {
- // no profile with that id
- return null;
- }
+ super(camelContext);
}
@Override
public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) {
- String sanitizedName = URISupport.sanitizeUri(name);
- ObjectHelper.notNull(profile, "ThreadPoolProfile");
-
- ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
- profile.addDefaults(defaultProfile);
-
- ThreadFactory threadFactory = createThreadFactory(sanitizedName, true);
- ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory);
- onThreadPoolCreated(executorService, source, profile.getId());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", source, sanitizedName, executorService);
- }
-
- return executorService;
- }
-
- @Override
- public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) {
- ThreadPoolProfile profile = new ThreadPoolProfile(name);
- profile.setPoolSize(poolSize);
- profile.setMaxPoolSize(maxPoolSize);
- return newThreadPool(source, name, profile);
- }
-
- @Override
- public ExecutorService newSingleThreadExecutor(Object source, String name) {
- return newFixedThreadPool(source, name, 1);
+ return super.newThreadPool(forceId(source), name, profile);
}
@Override
public ExecutorService newCachedThreadPool(Object source, String name) {
- String sanitizedName = URISupport.sanitizeUri(name);
- ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true));
- onThreadPoolCreated(answer, source, null);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new CachedThreadPool for source: {} with name: {}. -> {}", source, sanitizedName, answer);
- }
- return answer;
+ return super.newCachedThreadPool(forceId(source), name);
}
@Override
- public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
- ThreadPoolProfile profile = new ThreadPoolProfile(name);
- profile.setPoolSize(poolSize);
- profile.setMaxPoolSize(poolSize);
- profile.setKeepAliveTime(0L);
- return newThreadPool(source, name, profile);
- }
-
- @Override
- public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) {
- return newScheduledThreadPool(source, name, 1);
- }
-
- @Override
public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) {
- String sanitizedName = URISupport.sanitizeUri(name);
- profile.addDefaults(getDefaultThreadPoolProfile());
- ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true));
- onThreadPoolCreated(answer, source, null);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new ScheduledThreadPool for source: {} with name: {} -> {}", source, sanitizedName, answer);
- }
- return answer;
+ return super.newScheduledThreadPool(forceId(source), name, profile);
}
- @Override
- public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) {
- ThreadPoolProfile profile = getThreadPoolProfile(profileId);
- if (profile != null) {
- return newScheduledThreadPool(source, name, profile);
- } else {
- // no profile with that id
- return null;
+ protected Object forceId(Object source) {
+ if (source instanceof OptionalIdentifiedDefinition) {
+ NodeIdFactory factory = getCamelContext().adapt(ExtendedCamelContext.class).getNodeIdFactory();
+ ((OptionalIdentifiedDefinition) source).idOrCreate(factory);
}
- }
-
- @Override
- public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
- ThreadPoolProfile profile = new ThreadPoolProfile(name);
- profile.setPoolSize(poolSize);
- return newScheduledThreadPool(source, name, profile);
- }
-
- @Override
- public void shutdown(ExecutorService executorService) {
- doShutdown(executorService, 0, false);
- }
-
- @Override
- public void shutdownGraceful(ExecutorService executorService) {
- doShutdown(executorService, getShutdownAwaitTermination(), false);
- }
-
- @Override
- public void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
- doShutdown(executorService, shutdownAwaitTermination, false);
- }
-
- private boolean doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean failSafe) {
- if (executorService == null) {
- return false;
- }
-
- boolean warned = false;
-
- // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
- // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
- // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus
- // we ought to shutdown much faster)
- if (!executorService.isShutdown()) {
- StopWatch watch = new StopWatch();
-
- LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
- executorService.shutdown();
-
- if (shutdownAwaitTermination > 0) {
- try {
- if (!awaitTermination(executorService, shutdownAwaitTermination)) {
- warned = true;
- LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
- executorService.shutdownNow();
- // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
- if (!awaitTermination(executorService, shutdownAwaitTermination)) {
- LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
- }
- }
- } catch (InterruptedException e) {
- warned = true;
- LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
- // we were interrupted during shutdown, so force shutdown
- executorService.shutdownNow();
- }
- }
-
- // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
- if (warned) {
- LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
- executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken()));
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
- executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken()));
- }
- }
-
- // let lifecycle strategy be notified as well which can let it be managed in JMX as well
- ThreadPoolExecutor threadPool = null;
- if (executorService instanceof ThreadPoolExecutor) {
- threadPool = (ThreadPoolExecutor) executorService;
- } else if (executorService instanceof SizedScheduledExecutorService) {
- threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
- }
- if (threadPool != null) {
- for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
- lifecycle.onThreadPoolRemove(camelContext, threadPool);
- }
- }
-
- // remove reference as its shutdown (do not remove if fail-safe)
- if (!failSafe) {
- executorServices.remove(executorService);
- }
-
- return warned;
- }
-
- @Override
- public List<Runnable> shutdownNow(ExecutorService executorService) {
- return doShutdownNow(executorService, false);
- }
-
- private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) {
- ObjectHelper.notNull(executorService, "executorService");
-
- List<Runnable> answer = null;
- if (!executorService.isShutdown()) {
- if (failSafe) {
- // log as warn, as we shutdown as fail-safe, so end user should see more details in the log.
- LOG.warn("Forcing shutdown of ExecutorService: {}", executorService);
- } else {
- LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
- }
- answer = executorService.shutdownNow();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
- executorService, executorService.isShutdown(), executorService.isTerminated());
- }
- }
-
- // let lifecycle strategy be notified as well which can let it be managed in JMX as well
- ThreadPoolExecutor threadPool = null;
- if (executorService instanceof ThreadPoolExecutor) {
- threadPool = (ThreadPoolExecutor) executorService;
- } else if (executorService instanceof SizedScheduledExecutorService) {
- threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
- }
- if (threadPool != null) {
- for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
- lifecycle.onThreadPoolRemove(camelContext, threadPool);
- }
- }
-
- // remove reference as its shutdown (do not remove if fail-safe)
- if (!failSafe) {
- executorServices.remove(executorService);
- }
-
- return answer;
- }
-
- @Override
- public boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
- // log progress every 2nd second so end user is aware of we are shutting down
- StopWatch watch = new StopWatch();
- long interval = Math.min(2000, shutdownAwaitTermination);
- boolean done = false;
- while (!done && interval > 0) {
- if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
- done = true;
- } else {
- LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
- // recalculate interval
- interval = Math.min(2000, shutdownAwaitTermination - watch.taken());
- }
- }
-
- return done;
- }
-
- /**
- * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created.
- *
- * @param executorService the created {@link java.util.concurrent.ExecutorService}
- */
- protected void onNewExecutorService(ExecutorService executorService) {
- // noop
- }
-
- @Override
- protected void doStart() throws Exception {
- if (threadNamePattern == null) {
- // set default name pattern which includes the camel context name
- threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#";
- }
- }
-
- @Override
- protected void doStop() throws Exception {
- // noop
- }
-
- @Override
- protected void doShutdown() throws Exception {
- // shutdown all remainder executor services by looping and doing this aggressively
- // as by normal all threads pool should have been shutdown using proper lifecycle
- // by their EIPs, components etc. This is acting as a fail-safe during shutdown
- // of CamelContext itself.
- Set<ExecutorService> forced = new LinkedHashSet<>();
- if (!executorServices.isEmpty()) {
- // at first give a bit of time to shutdown nicely as the thread pool is most likely in the process of being shutdown also
- LOG.debug("Giving time for {} ExecutorService's to shutdown properly (acting as fail-safe)", executorServices.size());
- for (ExecutorService executorService : executorServices) {
- try {
- boolean warned = doShutdown(executorService, getShutdownAwaitTermination(), true);
- // remember the thread pools that was forced to shutdown (eg warned)
- if (warned) {
- forced.add(executorService);
- }
- } catch (Throwable e) {
- // only log if something goes wrong as we want to shutdown them all
- LOG.warn("Error occurred during shutdown of ExecutorService: "
- + executorService + ". This exception will be ignored.", e);
- }
- }
- }
-
- // log the thread pools which was forced to shutdown so it may help the user to identify a problem of his
- if (!forced.isEmpty()) {
- LOG.warn("Forced shutdown of {} ExecutorService's which has not been shutdown properly (acting as fail-safe)", forced.size());
- for (ExecutorService executorService : forced) {
- LOG.warn(" forced -> {}", executorService);
- }
- }
- forced.clear();
-
- // clear list
- executorServices.clear();
-
- // do not clear the default profile as we could potential be restarted
- Iterator<ThreadPoolProfile> it = threadPoolProfiles.values().iterator();
- while (it.hasNext()) {
- ThreadPoolProfile profile = it.next();
- if (!profile.isDefaultProfile()) {
- it.remove();
- }
- }
- }
-
- /**
- * Invoked when a new thread pool is created.
- * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext,
- * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method,
- * which for example will enlist the thread pool in JMX management.
- *
- * @param executorService the thread pool
- * @param source the source to use the thread pool
- * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile
- */
- private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) {
- // add to internal list of thread pools
- executorServices.add(executorService);
-
- String id;
- String sourceId = null;
- String routeId = null;
-
- // extract id from source
- if (source instanceof NamedNode) {
- id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory());
- // and let source be the short name of the pattern
- sourceId = ((NamedNode) source).getShortName();
- } else if (source instanceof String) {
- id = (String) source;
- } else if (source != null) {
- if (source instanceof StaticService) {
- // the source is static service so its name would be unique
- id = source.getClass().getSimpleName();
- } else {
- // fallback and use the simple class name with hashcode for the id so its unique for this given source
- id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")";
- }
- } else {
- // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique
- id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")";
- }
-
- // id is mandatory
- StringHelper.notEmpty(id, "id for thread pool " + executorService);
-
- // extract route id if possible
- if (source instanceof ProcessorDefinition) {
- RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source);
- if (route != null) {
- routeId = route.idOrCreate(this.camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory());
- }
- }
-
- // let lifecycle strategy be notified as well which can let it be managed in JMX as well
- ThreadPoolExecutor threadPool = null;
- if (executorService instanceof ThreadPoolExecutor) {
- threadPool = (ThreadPoolExecutor) executorService;
- } else if (executorService instanceof SizedScheduledExecutorService) {
- threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
- }
- if (threadPool != null) {
- for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
- lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
- }
- }
-
- // now call strategy to allow custom logic
- onNewExecutorService(executorService);
- }
-
- private ThreadFactory createThreadFactory(String name, boolean isDaemon) {
- return new CamelThreadFactory(threadNamePattern, name, isDaemon);
+ return source;
}
}
+
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultModel.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultModel.java
index 6da696f..af12707 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultModel.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultModel.java
@@ -31,6 +31,7 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.FailedToStartRouteException;
import org.apache.camel.Route;
import org.apache.camel.impl.engine.AbstractCamelContext;
+import org.apache.camel.impl.engine.DefaultRouteContext;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.Model;
@@ -300,7 +301,8 @@ public class DefaultModel implements Model {
AbstractCamelContext mcc = camelContext.adapt(AbstractCamelContext.class);
mcc.setStartingRoutes(true);
try {
- RouteContext routeContext = new DefaultRouteContext(camelContext, routeDefinition);
+ String id = routeDefinition.idOrCreate(camelContext.getNodeIdFactory());
+ RouteContext routeContext = new DefaultRouteContext(camelContext, routeDefinition, id);
Route route = new RouteReifier(routeDefinition).createRoute(camelContext, routeContext);
RouteService routeService = new RouteService(route);
mcc.startRouteService(routeService, true);
diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-core/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
index 6f6517c..997b2cd 100644
--- a/core/camel-core/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ b/core/camel-core/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
@@ -35,7 +35,7 @@ import org.apache.camel.model.ProcessorDefinitionHelper;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.RouteDefinitionHelper;
import org.apache.camel.processor.CamelInternalProcessor;
-import org.apache.camel.processor.CamelInternalProcessorAdvice;
+import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.processor.WrapProcessor;
import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler;
import org.apache.camel.processor.interceptor.BacklogDebugger;
@@ -290,7 +290,7 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann
}
if (!redeliveryPossible) {
// optimise to use advice as we cannot redeliver
- addAdvice(CamelInternalProcessorAdvice.wrap(instrumentationProcessor));
+ addAdvice(CamelInternalProcessor.wrap(instrumentationProcessor));
}
}
}
diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/RouteReifier.java
index 750d753..ae02c5a 100644
--- a/core/camel-core/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -27,6 +27,7 @@ import org.apache.camel.FailedToCreateRouteException;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.AdviceWithTask;
import org.apache.camel.builder.RouteBuilder;
@@ -34,9 +35,13 @@ import org.apache.camel.model.Model;
import org.apache.camel.model.ModelHelper;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.ProcessorDefinitionHelper;
+import org.apache.camel.model.PropertyDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.RoutesDefinition;
+import org.apache.camel.processor.ContractAdvice;
import org.apache.camel.processor.interceptor.HandleFault;
+import org.apache.camel.reifier.rest.RestBindingReifier;
+import org.apache.camel.spi.Contract;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RoutePolicy;
@@ -191,6 +196,9 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
// Implementation methods
// -------------------------------------------------------------------------
protected Route doCreateRoute(CamelContext camelContext, RouteContext routeContext) throws Exception {
+ // configure error handler
+ routeContext.setErrorHandlerFactory(definition.getErrorHandlerFactory());
+
// configure tracing
if (definition.getTrace() != null) {
Boolean isTrace = CamelContextHelper.parseBoolean(camelContext, definition.getTrace());
@@ -297,6 +305,11 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
routeContext.setAutoStartup(isAutoStartup);
}
+ // configure startup order
+ if (definition.getStartupOrder() != null) {
+ routeContext.setStartupOrder(definition.getStartupOrder());
+ }
+
// configure shutdown
if (definition.getShutdownRoute() != null) {
log.debug("Using ShutdownRoute {} on route: {}", definition.getShutdownRoute(), definition.getId());
@@ -339,6 +352,70 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
}
}
+ if (definition.getRestBindingDefinition() != null) {
+ try {
+ routeContext.addAdvice(new RestBindingReifier(definition.getRestBindingDefinition()).createRestBindingAdvice(routeContext));
+ } catch (Exception e) {
+ throw RuntimeCamelException.wrapRuntimeCamelException(e);
+ }
+ }
+
+ // wrap in contract
+ if (definition.getInputType() != null || definition.getOutputType() != null) {
+ Contract contract = new Contract();
+ if (definition.getInputType() != null) {
+ contract.setInputType(definition.getInputType().getUrn());
+ contract.setValidateInput(definition.getInputType().isValidate());
+ }
+ if (definition.getOutputType() != null) {
+ contract.setOutputType(definition.getOutputType().getUrn());
+ contract.setValidateOutput(definition.getOutputType().isValidate());
+ }
+ routeContext.addAdvice(new ContractAdvice(contract));
+ // make sure to enable data type as its in use when using input/output types on routes
+ camelContext.setUseDataType(true);
+ }
+
+ // Set route properties
+ routeContext.addProperty(Route.ID_PROPERTY, definition.getId());
+ routeContext.addProperty(Route.CUSTOM_ID_PROPERTY, definition.hasCustomIdAssigned() ? "true" : "false");
+ routeContext.addProperty(Route.PARENT_PROPERTY, Integer.toHexString(definition.hashCode()));
+ routeContext.addProperty(Route.DESCRIPTION_PROPERTY, definition.getDescriptionText());
+ if (definition.getGroup() != null) {
+ routeContext.addProperty(Route.GROUP_PROPERTY, definition.getGroup());
+ }
+ String rest = Boolean.toString(definition.isRest() != null && definition.isRest());
+ routeContext.addProperty(Route.REST_PROPERTY, rest);
+
+ List<PropertyDefinition> properties = definition.getRouteProperties();
+ if (properties != null) {
+ final String[] reservedProperties = new String[] {
+ Route.ID_PROPERTY,
+ Route.CUSTOM_ID_PROPERTY,
+ Route.PARENT_PROPERTY,
+ Route.DESCRIPTION_PROPERTY,
+ Route.GROUP_PROPERTY,
+ Route.REST_PROPERTY
+ };
+
+ for (PropertyDefinition prop : properties) {
+ try {
+ final String key = CamelContextHelper.parseText(camelContext, prop.getKey());
+ final String val = CamelContextHelper.parseText(camelContext, prop.getValue());
+
+ for (String property : reservedProperties) {
+ if (property.equalsIgnoreCase(key)) {
+ throw new IllegalArgumentException("Cannot set route property " + property + " as it is a reserved property");
+ }
+ }
+
+ routeContext.addProperty(key, val);
+ } catch (Exception e) {
+ throw RuntimeCamelException.wrapRuntimeCamelException(e);
+ }
+ }
+ }
+
return routeContext.commit();
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperConsumePredicateTest.java
similarity index 98%
rename from core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
rename to core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperConsumePredicateTest.java
index 2605b2b..ea2edbd 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperConsumePredicateTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.lang.reflect.Method;
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePropertyTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperConsumePropertyTest.java
similarity index 99%
rename from core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePropertyTest.java
rename to core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperConsumePropertyTest.java
index e7e9fe0..9aaee7b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePropertyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperConsumePropertyTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.lang.reflect.Method;
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperSedaConsumePredicateTest.java
similarity index 98%
rename from core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java
rename to core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperSedaConsumePredicateTest.java
index cedd4e8..39b0634 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperSedaConsumePredicateTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.lang.reflect.Method;
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java
similarity index 99%
rename from core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java
rename to core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java
index 52fd0fd..745d72d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -38,6 +38,8 @@ import org.apache.camel.ResolveEndpointFailedException;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.properties.PropertiesComponent;
+import org.apache.camel.impl.FooBar;
+import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ObjectHelper;
import org.apache.camel.support.SynchronizationAdapter;
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
similarity index 95%
rename from core/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java
rename to core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
index c571425..f9995a0 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.impl.engine;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
@@ -36,7 +36,7 @@ public class CustomThreadPoolFactoryTest extends ContextTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
- DefaultExecutorServiceManager executorServiceManager = new DefaultExecutorServiceManager(context);
+ BaseExecutorServiceManager executorServiceManager = new BaseExecutorServiceManager(context);
executorServiceManager.setThreadPoolFactory(factory);
context.setExecutorServiceManager(executorServiceManager);
return context;