You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/10/12 11:26:38 UTC
[camel] 34/44: Isolate o.a.c.management from o.a.c.impl
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 387216d38b6b4ae3d300db7e4c23be40200caea7
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Fri Oct 5 15:24:27 2018 +0200
Isolate o.a.c.management from o.a.c.impl
---
.../main/java/org/apache/camel/CamelContext.java | 5 ++
.../src/main/java/org/apache/camel/Consumer.java | 3 +
.../src/main/java/org/apache/camel/Route.java | 7 +++
.../camel}/cluster/ClusterServiceHelper.java | 3 +-
.../camel}/cluster/ClusterServiceSelectors.java | 3 +-
.../org/apache/camel/support/ServiceHelper.java | 4 +-
.../org/apache/camel/builder/RouteBuilder.java | 21 -------
.../camel/component/seda/SedaPollingConsumer.java | 6 ++
.../impl/cluster/ClusteredRouteController.java | 2 +
.../camel/impl/cluster/ClusteredRoutePolicy.java | 4 +-
.../impl/cluster/ClusteredRoutePolicyFactory.java | 1 +
.../DefaultManagementLifecycleStrategy.java | 64 +++++++++++-----------
.../InstrumentationInterceptStrategy.java | 13 ++---
.../management/mbean/ManagedClusterService.java | 2 +-
.../ManagedThrottlingExceptionRoutePolicy.java | 4 +-
.../ManagedThrottlingInflightRoutePolicy.java | 2 +-
.../camel/support/EventDrivenPollingConsumer.java | 5 ++
.../camel/support/PollingConsumerSupport.java | 6 ++
.../camel/support/ProcessorPollingConsumer.java | 5 ++
.../apache/camel/support/RoutePolicySupport.java | 41 ++++----------
.../ThrottlingExceptionHalfOpenHandler.java | 2 +-
.../ThrottlingExceptionRoutePolicy.java | 4 +-
.../ThrottlingInflightRoutePolicy.java | 5 +-
.../camel/component/test/TestEndpointTest.java | 5 ++
.../impl/cluster/ClusterServiceSelectorTest.java | 5 +-
.../ManagedThrottlingExceptionRoutePolicyTest.java | 4 +-
.../ManagedThrottlingInflightRoutePolicyTest.java | 2 +-
...xceptionRoutePolicyHalfOpenHandlerSedaTest.java | 4 +-
...ingExceptionRoutePolicyHalfOpenHandlerTest.java | 4 +-
...ThrottlingExceptionRoutePolicyHalfOpenTest.java | 2 +-
...lingExceptionRoutePolicyKeepOpenOnInitTest.java | 2 +-
...tlingExceptionRoutePolicyOpenViaConfigTest.java | 2 +-
.../ThrottlingExceptionRoutePolicyTest.java | 4 +-
.../ThrottlingInflightRoutePolicyTest.java | 2 +-
.../component/disruptor/DisruptorConsumer.java | 5 ++
.../jms/JmsThrottlingInflightRoutePolicyTest.java | 2 +-
.../tx/JMSTransactionThrottlingRoutePolicyTest.xml | 2 +-
.../camel/component/master/MasterComponent.java | 2 +-
.../quartz2/MultiplePoliciesOnRouteTest.java | 2 +-
.../camel/routepolicy/quartz2/MultiplePolicies.xml | 2 +-
.../BackpressurePublisherRoutePolicyTest.java | 2 +-
.../ThrottlingInflightRoutePolicyTest.xml | 2 +-
.../component/connector/ConnectorProducer.java | 2 +-
.../resources/META-INF/spring/camel-server.xml | 2 +-
44 files changed, 138 insertions(+), 133 deletions(-)
diff --git a/camel-api/src/main/java/org/apache/camel/CamelContext.java b/camel-api/src/main/java/org/apache/camel/CamelContext.java
index 741475c..704624b 100644
--- a/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -179,6 +179,11 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
String getManagementName();
/**
+ * Sets the name this {@link CamelContext} will be registered in JMX.
+ */
+ void setManagementName(String name);
+
+ /**
* Gets the version of the this CamelContext.
*
* @return the version
diff --git a/camel-api/src/main/java/org/apache/camel/Consumer.java b/camel-api/src/main/java/org/apache/camel/Consumer.java
index 89def42..825fe6a 100644
--- a/camel-api/src/main/java/org/apache/camel/Consumer.java
+++ b/camel-api/src/main/java/org/apache/camel/Consumer.java
@@ -20,4 +20,7 @@ package org.apache.camel;
* A consumer of message exchanges from an {@link Endpoint}
*/
public interface Consumer extends Service, EndpointAware {
+
+ Processor getProcessor();
+
}
diff --git a/camel-api/src/main/java/org/apache/camel/Route.java b/camel-api/src/main/java/org/apache/camel/Route.java
index 53fddce..23c46e0 100644
--- a/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/camel-api/src/main/java/org/apache/camel/Route.java
@@ -74,6 +74,13 @@ public interface Route extends EndpointAware {
Consumer getConsumer();
/**
+ * Gets the {@link Processor}
+ *
+ * @return the processor
+ */
+ Processor getProcessor();
+
+ /**
* Whether or not the route supports suspension (suspend and resume)
*
* @return <tt>true</tt> if this route supports suspension
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java
similarity index 95%
rename from camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java
rename to camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java
index 22b7706..1571e0f 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java
+++ b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java
@@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl.cluster;
+package org.apache.camel.cluster;
import java.util.Optional;
import java.util.Set;
import org.apache.camel.CamelContext;
-import org.apache.camel.cluster.CamelClusterService;
import org.apache.camel.util.ObjectHelper;
public final class ClusterServiceHelper {
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java
similarity index 98%
rename from camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java
rename to camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java
index aa86a5e..485f648 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java
+++ b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl.cluster;
+package org.apache.camel.cluster;
import java.util.Collection;
import java.util.Comparator;
@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import org.apache.camel.cluster.CamelClusterService;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java b/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java
index 9ef50b6..111eb37 100644
--- a/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java
+++ b/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java
@@ -268,10 +268,10 @@ public final class ServiceHelper {
*
* @param service the service
* @return <tt>true</tt> if either <tt>resume</tt> method or
- * {@link #startService(Service)} was called, <tt>false</tt>
+ * {@link #startService(Object)} was called, <tt>false</tt>
* otherwise.
* @throws Exception is thrown if error occurred
- * @see #startService(Service)
+ * @see #startService(Object)
*/
public static boolean resumeService(Object service) throws Exception {
if (service instanceof Suspendable && service instanceof SuspendableService) {
diff --git a/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
index 940c4d7..d5349f2 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
@@ -27,7 +27,6 @@ import org.apache.camel.Component;
import org.apache.camel.Endpoint;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
-import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.FromDefinition;
import org.apache.camel.model.InterceptDefinition;
import org.apache.camel.model.InterceptFromDefinition;
@@ -366,17 +365,6 @@ public abstract class RouteBuilder extends BuilderSupport implements RoutesBuild
return getRouteCollection().onCompletion();
}
- // Properties
- // -----------------------------------------------------------------------
- public ModelCamelContext getContext() {
- ModelCamelContext context = super.getContext();
- if (context == null) {
- context = createContainer();
- setContext(context);
- }
- return context;
- }
-
public void addRoutesToCamelContext(CamelContext context) throws Exception {
// must configure routes before rests
configureRoutes((ModelCamelContext) context);
@@ -568,15 +556,6 @@ public abstract class RouteBuilder extends BuilderSupport implements RoutesBuild
return this.routeCollection;
}
- /**
- * Factory method
- *
- * @return the CamelContext
- */
- protected ModelCamelContext createContainer() {
- return new DefaultCamelContext();
- }
-
protected void configureRest(RestDefinition rest) {
// noop
}
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
index ae8ae38..6abbd5d 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.IsSingleton;
+import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.PollingConsumerSupport;
@@ -36,6 +37,11 @@ public class SedaPollingConsumer extends PollingConsumerSupport implements IsSin
}
@Override
+ public Processor getProcessor() {
+ return null;
+ }
+
+ @Override
public Exchange receive() {
try {
return getEndpoint().getQueue().take();
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java
index aae4459..332a4fe 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java
@@ -35,11 +35,13 @@ import org.apache.camel.NamedNode;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.cluster.CamelClusterService;
+import org.apache.camel.cluster.ClusterServiceSelectors;
import org.apache.camel.impl.DefaultRouteController;
import org.apache.camel.meta.Experimental;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.cluster.ClusterServiceHelper;
import org.apache.camel.support.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
index 98420ab..29a7d67 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
@@ -37,14 +37,14 @@ import org.apache.camel.cluster.CamelClusterEventListener;
import org.apache.camel.cluster.CamelClusterMember;
import org.apache.camel.cluster.CamelClusterService;
import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.cluster.ClusterServiceSelectors;
import org.apache.camel.management.event.CamelContextStartedEvent;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.cluster.ClusterServiceHelper;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.RoutePolicySupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ReferenceCount;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@ManagedResource(description = "Clustered Route policy using")
public final class ClusteredRoutePolicy extends RoutePolicySupport implements CamelContextAware {
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java
index 9ff0e2f..ff058a2 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java
@@ -20,6 +20,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.NamedNode;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.cluster.CamelClusterService;
+import org.apache.camel.cluster.ClusterServiceSelectors;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.util.ObjectHelper;
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 5806f35..1971782 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -38,6 +38,7 @@ import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.ManagementStatisticsLevel;
+import org.apache.camel.NamedNode;
import org.apache.camel.NonManagedService;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -48,11 +49,8 @@ import org.apache.camel.StartupListener;
import org.apache.camel.TimerListener;
import org.apache.camel.VetoCamelContextStartException;
import org.apache.camel.cluster.CamelClusterService;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultEndpointRegistry;
-import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
import org.apache.camel.management.mbean.ManagedBacklogDebugger;
import org.apache.camel.management.mbean.ManagedBacklogTracer;
@@ -87,6 +85,7 @@ import org.apache.camel.runtimecatalog.RuntimeCamelCatalog;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ConsumerCache;
import org.apache.camel.spi.DataFormat;
+import org.apache.camel.spi.EndpointRegistry;
import org.apache.camel.spi.EventNotifier;
import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.LifecycleStrategy;
@@ -118,7 +117,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
// the wrapped processors is for performance counters, which are in use for the created routes
// when a route is removed, we should remove the associated processors from this map
- private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = new HashMap<>();
+ private final Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors = new HashMap<>();
private final List<PreRegisterService> preServices = new ArrayList<>();
private final TimerListenerManager loadTimer = new ManagedLoadTimer();
private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener();
@@ -149,7 +148,11 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
String name = context.getName();
- String managementName = context.getManagementNameStrategy().getName();
+ String managementName = context.getManagementName();
+
+ if (managementName == null) {
+ managementName = context.getManagementNameStrategy().getName();
+ }
try {
boolean done = false;
@@ -189,9 +192,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
}
// set the name we are going to use
- if (context instanceof DefaultCamelContext) {
- ((DefaultCamelContext) context).setManagementName(managementName);
- }
+ context.setManagementName(managementName);
try {
manageObject(mc);
@@ -503,8 +504,8 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
answer = new ManagedConsumerCache(context, (ConsumerCache) service);
} else if (service instanceof ProducerCache) {
answer = new ManagedProducerCache(context, (ProducerCache) service);
- } else if (service instanceof DefaultEndpointRegistry) {
- answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service);
+ } else if (service instanceof EndpointRegistry) {
+ answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service);
} else if (service instanceof TypeConverterRegistry) {
answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
} else if (service instanceof RestRegistry) {
@@ -545,7 +546,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
// a bit of magic here as the processors we want to manage have already been registered
// in the wrapped processors map when Camel have instrumented the route on route initialization
// so the idea is now to only manage the processors from the map
- KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
+ KeyValueHolder<NamedNode, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
if (holder == null) {
// skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
return null;
@@ -595,22 +596,19 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
// get the wrapped instrumentation processor from this route
// and set me as the counter
- if (route instanceof EventDrivenConsumerRoute) {
- EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route;
- Processor processor = edcr.getProcessor();
- if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
- CamelInternalProcessor internal = (CamelInternalProcessor) processor;
- ManagedRoute routeMBean = (ManagedRoute) mr;
-
- CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
- if (task != null) {
- // we need to wrap the counter with the camel context so we get stats updated on the context as well
- if (camelContextMBean != null) {
- CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
- task.setCounter(wrapper);
- } else {
- task.setCounter(routeMBean);
- }
+ Processor processor = route.getProcessor();
+ if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
+ CamelInternalProcessor internal = (CamelInternalProcessor) processor;
+ ManagedRoute routeMBean = (ManagedRoute) mr;
+
+ CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
+ if (task != null) {
+ // we need to wrap the counter with the camel context so we get stats updated on the context as well
+ if (camelContextMBean != null) {
+ CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
+ task.setCounter(wrapper);
+ } else {
+ task.setCounter(routeMBean);
}
}
}
@@ -746,7 +744,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
// Create a map (ProcessorType -> PerformanceCounter)
// to be passed to InstrumentationInterceptStrategy.
- Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = new HashMap<>();
+ Map<NamedNode, PerformanceCounter> registeredCounters = new HashMap<>();
// Each processor in a route will have its own performance counter.
// These performance counter will be embedded to InstrumentationProcessor
@@ -775,9 +773,9 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
for (Route route : routes) {
String id = route.getId();
- Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
+ Iterator<KeyValueHolder<NamedNode, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
while (it.hasNext()) {
- KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
+ KeyValueHolder<NamedNode, InstrumentationProcessor> holder = it.next();
RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
if (def != null && id.equals(def.getId())) {
it.remove();
@@ -788,7 +786,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
}
private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor,
- Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) {
+ Map<NamedNode, PerformanceCounter> registeredCounters) {
// traverse children if any exists
List<ProcessorDefinition<?>> children = processor.getOutputs();
diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
index 0a1fe79..186fc2c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
@@ -22,7 +22,6 @@ import org.apache.camel.CamelContext;
import org.apache.camel.NamedNode;
import org.apache.camel.Processor;
import org.apache.camel.management.mbean.ManagedPerformanceCounter;
-import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.util.KeyValueHolder;
@@ -36,21 +35,21 @@ import org.apache.camel.util.KeyValueHolder;
*/
public class InstrumentationInterceptStrategy implements InterceptStrategy {
- private Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters;
- private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors;
+ private Map<NamedNode, PerformanceCounter> registeredCounters;
+ private final Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors;
- public InstrumentationInterceptStrategy(Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters,
- Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors) {
+ public InstrumentationInterceptStrategy(Map<NamedNode, PerformanceCounter> registeredCounters,
+ Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors) {
this.registeredCounters = registeredCounters;
this.wrappedProcessors = wrappedProcessors;
}
- public PerformanceCounter prepareProcessor(ProcessorDefinition<?> definition, Processor target, InstrumentationProcessor advice) {
+ public PerformanceCounter prepareProcessor(NamedNode definition, Processor target, InstrumentationProcessor advice) {
PerformanceCounter counter = registeredCounters.get(definition);
if (counter != null) {
// add it to the mapping of wrappers so we can later change it to a
// decorated counter when we register the processor
- KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = new KeyValueHolder<>(definition, advice);
+ KeyValueHolder<NamedNode, InstrumentationProcessor> holder = new KeyValueHolder<>(definition, advice);
wrappedProcessors.put(target, holder);
}
return counter;
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java
index cb4c427..52b8cb5 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java
@@ -25,7 +25,7 @@ import org.apache.camel.ServiceStatus;
import org.apache.camel.StatefulService;
import org.apache.camel.api.management.mbean.ManagedClusterServiceMBean;
import org.apache.camel.cluster.CamelClusterService;
-import org.apache.camel.impl.cluster.ClusterServiceHelper;
+import org.apache.camel.cluster.ClusterServiceHelper;
import org.apache.camel.spi.ManagementStrategy;
public class ManagedClusterService implements ManagedClusterServiceMBean {
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java
index 8030eae..2f49504d 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java
@@ -19,8 +19,8 @@ package org.apache.camel.management.mbean;
import org.apache.camel.CamelContext;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.ManagedThrottlingExceptionRoutePolicyMBean;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
@ManagedResource(description = "Managed ThrottlingExceptionRoutePolicy")
public class ManagedThrottlingExceptionRoutePolicy extends ManagedService implements ManagedThrottlingExceptionRoutePolicyMBean {
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java
index f8c1c74..7c287e4 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java
@@ -20,7 +20,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.ManagedThrottlingInflightRoutePolicyMBean;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
@ManagedResource(description = "Managed ThrottlingInflightRoutePolicy")
public class ManagedThrottlingInflightRoutePolicy extends ManagedService implements ManagedThrottlingInflightRoutePolicyMBean {
diff --git a/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
index 4b424bd..17db223 100644
--- a/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
@@ -67,6 +67,11 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
}
+ @Override
+ public Processor getProcessor() {
+ return this;
+ }
+
public boolean isBlockWhenFull() {
return blockWhenFull;
}
diff --git a/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java b/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
index d69a92f..09224fc 100644
--- a/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
@@ -18,6 +18,7 @@ package org.apache.camel.support;
import org.apache.camel.Endpoint;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
/**
@@ -42,6 +43,11 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements P
return endpoint;
}
+ @Override
+ public Processor getProcessor() {
+ return null;
+ }
+
public ExceptionHandler getExceptionHandler() {
return exceptionHandler;
}
diff --git a/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java
index 9893746..17a63f2 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java
@@ -37,6 +37,11 @@ public class ProcessorPollingConsumer extends PollingConsumerSupport implements
this.processor = processor;
}
+ @Override
+ public Processor getProcessor() {
+ return processor;
+ }
+
protected void doStart() throws Exception {
ServiceHelper.startService(processor);
}
diff --git a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
index a74eea8..90d20e4 100644
--- a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Route;
-import org.apache.camel.Suspendable;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.RoutePolicy;
@@ -107,21 +106,13 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route
* @return <tt>true</tt> if the consumer was suspended or stopped, <tt>false</tt> if the consumer was already suspend or stopped
*/
public boolean suspendOrStopConsumer(Consumer consumer) throws Exception {
- if (consumer instanceof Suspendable) {
- boolean suspended = ServiceHelper.suspendService(consumer);
- if (suspended) {
- log.debug("Suspended consumer {}", consumer);
- } else {
- log.trace("Consumer already suspended {}", consumer);
- }
- return suspended;
- }
- if (!ServiceHelper.isStopped(consumer)) {
- ServiceHelper.stopService(consumer);
- log.debug("Stopped consumer {}", consumer);
- return true;
+ boolean suspended = ServiceHelper.suspendService(consumer);
+ if (suspended) {
+ log.debug("Suspended consumer {}", consumer);
+ } else {
+ log.trace("Consumer already suspended {}", consumer);
}
- return false;
+ return suspended;
}
/**
@@ -134,21 +125,13 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route
* @return <tt>true</tt> if the consumer was resumed or started, <tt>false</tt> if the consumer was already resumed or started
*/
public boolean resumeOrStartConsumer(Consumer consumer) throws Exception {
- if (consumer instanceof Suspendable) {
- boolean resumed = ServiceHelper.resumeService(consumer);
- if (resumed) {
- log.debug("Resumed consumer {}", consumer);
- } else {
- log.trace("Consumer already resumed {}", consumer);
- }
- return resumed;
- }
- if (!ServiceHelper.isStarted(consumer)) {
- ServiceHelper.startService(consumer);
- log.debug("Started consumer {}", consumer);
- return true;
+ boolean resumed = ServiceHelper.resumeService(consumer);
+ if (resumed) {
+ log.debug("Resumed consumer {}", consumer);
+ } else {
+ log.trace("Consumer already resumed {}", consumer);
}
- return false;
+ return resumed;
}
public void startRoute(Route route) throws Exception {
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java
similarity index 96%
rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java
index 84607e76..7bdb502 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.throttling;
/**
* Used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java
similarity index 99%
rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java
index 042559a..86f3dcb 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.throttling;
import java.util.List;
import java.util.Timer;
@@ -30,8 +30,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.Route;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.support.RoutePolicySupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Modeled after the circuit breaker {@link ThrottlingInflightRoutePolicy}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java
similarity index 98%
rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java
index 4947c47..d662c50 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl;
+package org.apache.camel.throttling;
import java.util.EventObject;
import java.util.LinkedHashSet;
@@ -34,7 +34,6 @@ import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.RoutePolicySupport;
import org.apache.camel.support.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
-import org.slf4j.LoggerFactory;
/**
* A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable of dynamic
@@ -224,7 +223,7 @@ public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements
}
protected CamelLogger createLogger() {
- return new CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), getLoggingLevel());
+ return new CamelLogger(log, getLoggingLevel());
}
private int getSize(Route route, Exchange exchange) {
diff --git a/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java b/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
index 5d61765..03c529e 100644
--- a/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
@@ -79,6 +79,11 @@ public class TestEndpointTest extends ContextTestSupport {
}
@Override
+ public Processor getProcessor() {
+ return null;
+ }
+
+ @Override
public void start() throws Exception {
// when starting then send a message to the processor
Exchange exchange = new DefaultExchange(getEndpoint());
diff --git a/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java b/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java
index 318e1fc..d0897da 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java
@@ -24,13 +24,14 @@ import org.apache.camel.CamelContext;
import org.apache.camel.cluster.CamelClusterMember;
import org.apache.camel.cluster.CamelClusterService;
import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.cluster.ClusterServiceSelectors;
import org.apache.camel.component.file.cluster.FileLockClusterService;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Assert;
import org.junit.Test;
-import static org.apache.camel.impl.cluster.ClusterServiceHelper.lookupService;
-import static org.apache.camel.impl.cluster.ClusterServiceHelper.mandatoryLookupService;
+import static org.apache.camel.cluster.ClusterServiceHelper.lookupService;
+import static org.apache.camel.cluster.ClusterServiceHelper.mandatoryLookupService;
public class ClusterServiceSelectorTest {
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
index 222f3b6..aef3ea8 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
@@ -27,8 +27,8 @@ import org.apache.camel.Processor;
import org.apache.camel.ServiceStatus;
import org.apache.camel.api.management.mbean.ManagedThrottlingExceptionRoutePolicyMBean;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
import org.junit.Test;
public class ManagedThrottlingExceptionRoutePolicyTest extends ManagementTestSupport {
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java
index 43141da..2cb25ae 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java
@@ -23,7 +23,7 @@ import javax.management.ObjectName;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
import org.junit.Test;
public class ManagedThrottlingInflightRoutePolicyTest extends ManagementTestSupport {
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
index 44b89c5..fd66596 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -25,8 +25,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
import org.apache.camel.support.ServiceSupport;
import org.junit.Before;
import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
index 24e2f5f..72da3aa 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -25,8 +25,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
import org.apache.camel.support.ServiceSupport;
import org.junit.Before;
import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
index 49ff49a..9791f21 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
@@ -25,7 +25,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
import org.apache.camel.support.ServiceSupport;
import org.junit.Before;
import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
index b650dbd..73f30a8 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
@@ -19,7 +19,7 @@ package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
import org.junit.Before;
import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
index c096826..27fb6be 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
@@ -19,7 +19,7 @@ package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
import org.junit.Before;
import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
index c944576..7a410bc 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -25,8 +25,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
import org.apache.camel.support.ServiceSupport;
import org.junit.Before;
import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
index d3364e6..935a388 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
@@ -18,7 +18,7 @@ package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
import org.junit.Test;
public class ThrottlingInflightRoutePolicyTest extends ContextTestSupport {
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index d72e7ab..63afcea 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -59,6 +59,11 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe
this.processor = AsyncProcessorConverterHelper.convert(processor);
}
+ @Override
+ public AsyncProcessor getProcessor() {
+ return processor;
+ }
+
public ExceptionHandler getExceptionHandler() {
if (exceptionHandler == null) {
exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java
index 0d01804..6b8e2de 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java
@@ -21,7 +21,7 @@ import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
diff --git a/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml b/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml
index d0dac16..ba982c5 100644
--- a/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml
+++ b/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml
@@ -49,7 +49,7 @@
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
- <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
+ <bean id="myPolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy">
<property name="maxInflightExchanges" value="16"/>
<property name="resumePercentOfMax" value="25"/>
</bean>
diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
index a877b62..8f31a07 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
@@ -23,7 +23,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.cluster.CamelClusterService;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.impl.cluster.ClusterServiceHelper;
-import org.apache.camel.impl.cluster.ClusterServiceSelectors;
+import org.apache.camel.cluster.ClusterServiceSelectors;
import org.apache.camel.spi.Metadata;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java
index 21894ce..56294ac 100644
--- a/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java
+++ b/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java
@@ -23,7 +23,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.quartz2.QuartzComponent;
import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
diff --git a/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml b/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml
index cbb9462..9792fbe 100644
--- a/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml
+++ b/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml
@@ -34,7 +34,7 @@
<property name="routeStartRepeatInterval" value="3000"/>
</bean>
- <bean id="throttlePolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
+ <bean id="throttlePolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy">
<property name="maxInflightExchanges" value="10"/>
</bean>
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
index 728314c..2e95e92 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
@@ -26,7 +26,7 @@ import org.apache.camel.StatefulService;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.component.reactive.streams.support.TestSubscriber;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.reactivestreams.Publisher;
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml
index cdb10c6..402baff 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml
@@ -26,7 +26,7 @@
<!-- START SNIPPET: e1 -->
<!-- configure our route policy to throttling based -->
- <bean id="myRoutePolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
+ <bean id="myRoutePolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy">
<!-- we want at most 10 concurrent inflight exchanges -->
<property name="maxInflightExchanges" value="10"/>
<!-- and we want a low water mark value of 20% of the max which means that
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
index 07da23d..122c002 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -29,7 +29,7 @@ import org.apache.camel.support.ServiceHelper;
/**
* Connector {@link Producer} which is capable of performing before and after custom processing
- * via the {@link Pipeline }while processing (ie sending the message).
+ * via the {@link Pipeline} while processing (ie sending the message).
*/
public class ConnectorProducer extends DefaultAsyncProducer {
diff --git a/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml b/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml
index 5f1d890..34f08d7 100644
--- a/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml
+++ b/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml
@@ -42,7 +42,7 @@
</bean>
<!-- START SNIPPET: e1 -->
- <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
+ <bean id="myPolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy">
<!-- define the scope to be context scoped so we measure against total inflight exchanges
that means for both route1, route2 and route3 all together -->
<property name="scope" value="Context"/>