You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/10/12 11:26:38 UTC

[camel] 34/44: Isolate o.a.c.management from o.a.c.impl

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 387216d38b6b4ae3d300db7e4c23be40200caea7
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Fri Oct 5 15:24:27 2018 +0200

    Isolate o.a.c.management from o.a.c.impl
---
 .../main/java/org/apache/camel/CamelContext.java   |  5 ++
 .../src/main/java/org/apache/camel/Consumer.java   |  3 +
 .../src/main/java/org/apache/camel/Route.java      |  7 +++
 .../camel}/cluster/ClusterServiceHelper.java       |  3 +-
 .../camel}/cluster/ClusterServiceSelectors.java    |  3 +-
 .../org/apache/camel/support/ServiceHelper.java    |  4 +-
 .../org/apache/camel/builder/RouteBuilder.java     | 21 -------
 .../camel/component/seda/SedaPollingConsumer.java  |  6 ++
 .../impl/cluster/ClusteredRouteController.java     |  2 +
 .../camel/impl/cluster/ClusteredRoutePolicy.java   |  4 +-
 .../impl/cluster/ClusteredRoutePolicyFactory.java  |  1 +
 .../DefaultManagementLifecycleStrategy.java        | 64 +++++++++++-----------
 .../InstrumentationInterceptStrategy.java          | 13 ++---
 .../management/mbean/ManagedClusterService.java    |  2 +-
 .../ManagedThrottlingExceptionRoutePolicy.java     |  4 +-
 .../ManagedThrottlingInflightRoutePolicy.java      |  2 +-
 .../camel/support/EventDrivenPollingConsumer.java  |  5 ++
 .../camel/support/PollingConsumerSupport.java      |  6 ++
 .../camel/support/ProcessorPollingConsumer.java    |  5 ++
 .../apache/camel/support/RoutePolicySupport.java   | 41 ++++----------
 .../ThrottlingExceptionHalfOpenHandler.java        |  2 +-
 .../ThrottlingExceptionRoutePolicy.java            |  4 +-
 .../ThrottlingInflightRoutePolicy.java             |  5 +-
 .../camel/component/test/TestEndpointTest.java     |  5 ++
 .../impl/cluster/ClusterServiceSelectorTest.java   |  5 +-
 .../ManagedThrottlingExceptionRoutePolicyTest.java |  4 +-
 .../ManagedThrottlingInflightRoutePolicyTest.java  |  2 +-
 ...xceptionRoutePolicyHalfOpenHandlerSedaTest.java |  4 +-
 ...ingExceptionRoutePolicyHalfOpenHandlerTest.java |  4 +-
 ...ThrottlingExceptionRoutePolicyHalfOpenTest.java |  2 +-
 ...lingExceptionRoutePolicyKeepOpenOnInitTest.java |  2 +-
 ...tlingExceptionRoutePolicyOpenViaConfigTest.java |  2 +-
 .../ThrottlingExceptionRoutePolicyTest.java        |  4 +-
 .../ThrottlingInflightRoutePolicyTest.java         |  2 +-
 .../component/disruptor/DisruptorConsumer.java     |  5 ++
 .../jms/JmsThrottlingInflightRoutePolicyTest.java  |  2 +-
 .../tx/JMSTransactionThrottlingRoutePolicyTest.xml |  2 +-
 .../camel/component/master/MasterComponent.java    |  2 +-
 .../quartz2/MultiplePoliciesOnRouteTest.java       |  2 +-
 .../camel/routepolicy/quartz2/MultiplePolicies.xml |  2 +-
 .../BackpressurePublisherRoutePolicyTest.java      |  2 +-
 .../ThrottlingInflightRoutePolicyTest.xml          |  2 +-
 .../component/connector/ConnectorProducer.java     |  2 +-
 .../resources/META-INF/spring/camel-server.xml     |  2 +-
 44 files changed, 138 insertions(+), 133 deletions(-)

diff --git a/camel-api/src/main/java/org/apache/camel/CamelContext.java b/camel-api/src/main/java/org/apache/camel/CamelContext.java
index 741475c..704624b 100644
--- a/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -179,6 +179,11 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
     String getManagementName();
 
     /**
+     * Sets the name this {@link CamelContext} will be registered in JMX.
+     */
+    void setManagementName(String name);
+
+    /**
      * Gets the version of the this CamelContext.
      *
      * @return the version
diff --git a/camel-api/src/main/java/org/apache/camel/Consumer.java b/camel-api/src/main/java/org/apache/camel/Consumer.java
index 89def42..825fe6a 100644
--- a/camel-api/src/main/java/org/apache/camel/Consumer.java
+++ b/camel-api/src/main/java/org/apache/camel/Consumer.java
@@ -20,4 +20,7 @@ package org.apache.camel;
  * A consumer of message exchanges from an {@link Endpoint}
  */
 public interface Consumer extends Service, EndpointAware {
+
+    Processor getProcessor();
+
 }
diff --git a/camel-api/src/main/java/org/apache/camel/Route.java b/camel-api/src/main/java/org/apache/camel/Route.java
index 53fddce..23c46e0 100644
--- a/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/camel-api/src/main/java/org/apache/camel/Route.java
@@ -74,6 +74,13 @@ public interface Route extends EndpointAware {
     Consumer getConsumer();
 
     /**
+     * Gets the {@link Processor}
+     *
+     * @return the processor
+     */
+    Processor getProcessor();
+
+    /**
      * Whether or not the route supports suspension (suspend and resume)
      *
      * @return <tt>true</tt> if this route supports suspension
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java
similarity index 95%
rename from camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java
rename to camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java
index 22b7706..1571e0f 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java
+++ b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.impl.cluster;
+package org.apache.camel.cluster;
 
 import java.util.Optional;
 import java.util.Set;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.cluster.CamelClusterService;
 import org.apache.camel.util.ObjectHelper;
 
 public final class ClusterServiceHelper {
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java
similarity index 98%
rename from camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java
rename to camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java
index aa86a5e..485f648 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java
+++ b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.impl.cluster;
+package org.apache.camel.cluster;
 
 import java.util.Collection;
 import java.util.Comparator;
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import org.apache.camel.cluster.CamelClusterService;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java b/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java
index 9ef50b6..111eb37 100644
--- a/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java
+++ b/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java
@@ -268,10 +268,10 @@ public final class ServiceHelper {
      * 
      * @param service the service
      * @return <tt>true</tt> if either <tt>resume</tt> method or
-     *         {@link #startService(Service)} was called, <tt>false</tt>
+     *         {@link #startService(Object)} was called, <tt>false</tt>
      *         otherwise.
      * @throws Exception is thrown if error occurred
-     * @see #startService(Service)
+     * @see #startService(Object)
      */
     public static boolean resumeService(Object service) throws Exception {
         if (service instanceof Suspendable && service instanceof SuspendableService) {
diff --git a/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
index 940c4d7..d5349f2 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
@@ -27,7 +27,6 @@ import org.apache.camel.Component;
 import org.apache.camel.Endpoint;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.component.properties.PropertiesComponent;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.InterceptDefinition;
 import org.apache.camel.model.InterceptFromDefinition;
@@ -366,17 +365,6 @@ public abstract class RouteBuilder extends BuilderSupport implements RoutesBuild
         return getRouteCollection().onCompletion();
     }
     
-    // Properties
-    // -----------------------------------------------------------------------
-    public ModelCamelContext getContext() {
-        ModelCamelContext context = super.getContext();
-        if (context == null) {
-            context = createContainer();
-            setContext(context);
-        }
-        return context;
-    }
-
     public void addRoutesToCamelContext(CamelContext context) throws Exception {
         // must configure routes before rests
         configureRoutes((ModelCamelContext) context);
@@ -568,15 +556,6 @@ public abstract class RouteBuilder extends BuilderSupport implements RoutesBuild
         return this.routeCollection;
     }
 
-    /**
-     * Factory method
-     *
-     * @return the CamelContext
-     */
-    protected ModelCamelContext createContainer() {
-        return new DefaultCamelContext();
-    }
-
     protected void configureRest(RestDefinition rest) {
         // noop
     }
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
index ae8ae38..6abbd5d 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.IsSingleton;
+import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.support.PollingConsumerSupport;
 
@@ -36,6 +37,11 @@ public class SedaPollingConsumer extends PollingConsumerSupport implements IsSin
     }
 
     @Override
+    public Processor getProcessor() {
+        return null;
+    }
+
+    @Override
     public Exchange receive() {
         try {
             return getEndpoint().getQueue().take();
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java
index aae4459..332a4fe 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java
@@ -35,11 +35,13 @@ import org.apache.camel.NamedNode;
 import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.cluster.CamelClusterService;
+import org.apache.camel.cluster.ClusterServiceSelectors;
 import org.apache.camel.impl.DefaultRouteController;
 import org.apache.camel.meta.Experimental;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.cluster.ClusterServiceHelper;
 import org.apache.camel.support.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
index 98420ab..29a7d67 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
@@ -37,14 +37,14 @@ import org.apache.camel.cluster.CamelClusterEventListener;
 import org.apache.camel.cluster.CamelClusterMember;
 import org.apache.camel.cluster.CamelClusterService;
 import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.cluster.ClusterServiceSelectors;
 import org.apache.camel.management.event.CamelContextStartedEvent;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.cluster.ClusterServiceHelper;
 import org.apache.camel.support.EventNotifierSupport;
 import org.apache.camel.support.RoutePolicySupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ReferenceCount;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @ManagedResource(description = "Clustered Route policy using")
 public final class ClusteredRoutePolicy extends RoutePolicySupport implements CamelContextAware {
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java
index 9ff0e2f..ff058a2 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java
@@ -20,6 +20,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.NamedNode;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.cluster.CamelClusterService;
+import org.apache.camel.cluster.ClusterServiceSelectors;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.RoutePolicyFactory;
 import org.apache.camel.util.ObjectHelper;
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 5806f35..1971782 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -38,6 +38,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.ManagementStatisticsLevel;
+import org.apache.camel.NamedNode;
 import org.apache.camel.NonManagedService;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -48,11 +49,8 @@ import org.apache.camel.StartupListener;
 import org.apache.camel.TimerListener;
 import org.apache.camel.VetoCamelContextStartException;
 import org.apache.camel.cluster.CamelClusterService;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultEndpointRegistry;
-import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
 import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
 import org.apache.camel.management.mbean.ManagedBacklogDebugger;
 import org.apache.camel.management.mbean.ManagedBacklogTracer;
@@ -87,6 +85,7 @@ import org.apache.camel.runtimecatalog.RuntimeCamelCatalog;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.DataFormat;
+import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.LifecycleStrategy;
@@ -118,7 +117,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
 
     // the wrapped processors is for performance counters, which are in use for the created routes
     // when a route is removed, we should remove the associated processors from this map
-    private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = new HashMap<>();
+    private final Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors = new HashMap<>();
     private final List<PreRegisterService> preServices = new ArrayList<>();
     private final TimerListenerManager loadTimer = new ManagedLoadTimer();
     private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener();
@@ -149,7 +148,11 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
         Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
 
         String name = context.getName();
-        String managementName = context.getManagementNameStrategy().getName();
+        String managementName = context.getManagementName();
+
+        if (managementName == null) {
+            managementName = context.getManagementNameStrategy().getName();
+        }
 
         try {
             boolean done = false;
@@ -189,9 +192,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
         }
 
         // set the name we are going to use
-        if (context instanceof DefaultCamelContext) {
-            ((DefaultCamelContext) context).setManagementName(managementName);
-        }
+        context.setManagementName(managementName);
 
         try {
             manageObject(mc);
@@ -503,8 +504,8 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
             answer = new ManagedConsumerCache(context, (ConsumerCache) service);
         } else if (service instanceof ProducerCache) {
             answer = new ManagedProducerCache(context, (ProducerCache) service);
-        } else if (service instanceof DefaultEndpointRegistry) {
-            answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service);
+        } else if (service instanceof EndpointRegistry) {
+            answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service);
         } else if (service instanceof TypeConverterRegistry) {
             answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
         } else if (service instanceof RestRegistry) {
@@ -545,7 +546,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
         // a bit of magic here as the processors we want to manage have already been registered
         // in the wrapped processors map when Camel have instrumented the route on route initialization
         // so the idea is now to only manage the processors from the map
-        KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
+        KeyValueHolder<NamedNode, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
         if (holder == null) {
             // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
             return null;
@@ -595,22 +596,19 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
 
             // get the wrapped instrumentation processor from this route
             // and set me as the counter
-            if (route instanceof EventDrivenConsumerRoute) {
-                EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route;
-                Processor processor = edcr.getProcessor();
-                if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
-                    CamelInternalProcessor internal = (CamelInternalProcessor) processor;
-                    ManagedRoute routeMBean = (ManagedRoute) mr;
-
-                    CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
-                    if (task != null) {
-                        // we need to wrap the counter with the camel context so we get stats updated on the context as well
-                        if (camelContextMBean != null) {
-                            CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
-                            task.setCounter(wrapper);
-                        } else {
-                            task.setCounter(routeMBean);
-                        }
+            Processor processor = route.getProcessor();
+            if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
+                CamelInternalProcessor internal = (CamelInternalProcessor) processor;
+                ManagedRoute routeMBean = (ManagedRoute) mr;
+
+                CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
+                if (task != null) {
+                    // we need to wrap the counter with the camel context so we get stats updated on the context as well
+                    if (camelContextMBean != null) {
+                        CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
+                        task.setCounter(wrapper);
+                    } else {
+                        task.setCounter(routeMBean);
                     }
                 }
             }
@@ -746,7 +744,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
 
         // Create a map (ProcessorType -> PerformanceCounter)
         // to be passed to InstrumentationInterceptStrategy.
-        Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = new HashMap<>();
+        Map<NamedNode, PerformanceCounter> registeredCounters = new HashMap<>();
 
         // Each processor in a route will have its own performance counter.
         // These performance counter will be embedded to InstrumentationProcessor
@@ -775,9 +773,9 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
         for (Route route : routes) {
             String id = route.getId();
 
-            Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
+            Iterator<KeyValueHolder<NamedNode, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
             while (it.hasNext()) {
-                KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
+                KeyValueHolder<NamedNode, InstrumentationProcessor> holder = it.next();
                 RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
                 if (def != null && id.equals(def.getId())) {
                     it.remove();
@@ -788,7 +786,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
     }
 
     private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor,
-                                             Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) {
+                                             Map<NamedNode, PerformanceCounter> registeredCounters) {
 
         // traverse children if any exists
         List<ProcessorDefinition<?>> children = processor.getOutputs();
diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
index 0a1fe79..186fc2c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
@@ -22,7 +22,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.NamedNode;
 import org.apache.camel.Processor;
 import org.apache.camel.management.mbean.ManagedPerformanceCounter;
-import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.util.KeyValueHolder;
 
@@ -36,21 +35,21 @@ import org.apache.camel.util.KeyValueHolder;
  */
 public class InstrumentationInterceptStrategy implements InterceptStrategy {
 
-    private Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters;
-    private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors;
+    private Map<NamedNode, PerformanceCounter> registeredCounters;
+    private final Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors;
 
-    public InstrumentationInterceptStrategy(Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters,
-            Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors) {
+    public InstrumentationInterceptStrategy(Map<NamedNode, PerformanceCounter> registeredCounters,
+            Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors) {
         this.registeredCounters = registeredCounters;
         this.wrappedProcessors = wrappedProcessors;
     }
 
-    public PerformanceCounter prepareProcessor(ProcessorDefinition<?> definition, Processor target, InstrumentationProcessor advice) {
+    public PerformanceCounter prepareProcessor(NamedNode definition, Processor target, InstrumentationProcessor advice) {
         PerformanceCounter counter = registeredCounters.get(definition);
         if (counter != null) {
             // add it to the mapping of wrappers so we can later change it to a
             // decorated counter when we register the processor
-            KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = new KeyValueHolder<>(definition, advice);
+            KeyValueHolder<NamedNode, InstrumentationProcessor> holder = new KeyValueHolder<>(definition, advice);
             wrappedProcessors.put(target, holder);
         }
         return counter;
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java
index cb4c427..52b8cb5 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java
@@ -25,7 +25,7 @@ import org.apache.camel.ServiceStatus;
 import org.apache.camel.StatefulService;
 import org.apache.camel.api.management.mbean.ManagedClusterServiceMBean;
 import org.apache.camel.cluster.CamelClusterService;
-import org.apache.camel.impl.cluster.ClusterServiceHelper;
+import org.apache.camel.cluster.ClusterServiceHelper;
 import org.apache.camel.spi.ManagementStrategy;
 
 public class ManagedClusterService implements ManagedClusterServiceMBean {
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java
index 8030eae..2f49504d 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java
@@ -19,8 +19,8 @@ package org.apache.camel.management.mbean;
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedThrottlingExceptionRoutePolicyMBean;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
 
 @ManagedResource(description = "Managed ThrottlingExceptionRoutePolicy")
 public class ManagedThrottlingExceptionRoutePolicy extends ManagedService implements ManagedThrottlingExceptionRoutePolicyMBean {
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java
index f8c1c74..7c287e4 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java
@@ -20,7 +20,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedThrottlingInflightRoutePolicyMBean;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
 
 @ManagedResource(description = "Managed ThrottlingInflightRoutePolicy")
 public class ManagedThrottlingInflightRoutePolicy extends ManagedService implements ManagedThrottlingInflightRoutePolicyMBean {
diff --git a/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
index 4b424bd..17db223 100644
--- a/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
@@ -67,6 +67,11 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
         this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
     }
 
+    @Override
+    public Processor getProcessor() {
+        return this;
+    }
+
     public boolean isBlockWhenFull() {
         return blockWhenFull;
     }
diff --git a/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java b/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
index d69a92f..09224fc 100644
--- a/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
@@ -18,6 +18,7 @@ package org.apache.camel.support;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
 import org.apache.camel.spi.ExceptionHandler;
 
 /**
@@ -42,6 +43,11 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements P
         return endpoint;
     }
 
+    @Override
+    public Processor getProcessor() {
+        return null;
+    }
+
     public ExceptionHandler getExceptionHandler() {
         return exceptionHandler;
     }
diff --git a/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java
index 9893746..17a63f2 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java
@@ -37,6 +37,11 @@ public class ProcessorPollingConsumer extends PollingConsumerSupport implements
         this.processor = processor;
     }
 
+    @Override
+    public Processor getProcessor() {
+        return processor;
+    }
+
     protected void doStart() throws Exception {
         ServiceHelper.startService(processor);
     }
diff --git a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
index a74eea8..90d20e4 100644
--- a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
-import org.apache.camel.Suspendable;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.RoutePolicy;
 
@@ -107,21 +106,13 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route
      * @return <tt>true</tt> if the consumer was suspended or stopped, <tt>false</tt> if the consumer was already suspend or stopped
      */
     public boolean suspendOrStopConsumer(Consumer consumer) throws Exception {
-        if (consumer instanceof Suspendable) {
-            boolean suspended = ServiceHelper.suspendService(consumer);
-            if (suspended) {
-                log.debug("Suspended consumer {}", consumer);
-            } else {
-                log.trace("Consumer already suspended {}", consumer);
-            }
-            return suspended;
-        }
-        if (!ServiceHelper.isStopped(consumer)) {
-            ServiceHelper.stopService(consumer);
-            log.debug("Stopped consumer {}", consumer);
-            return true;
+        boolean suspended = ServiceHelper.suspendService(consumer);
+        if (suspended) {
+            log.debug("Suspended consumer {}", consumer);
+        } else {
+            log.trace("Consumer already suspended {}", consumer);
         }
-        return false;
+        return suspended;
     }
 
     /**
@@ -134,21 +125,13 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route
      * @return <tt>true</tt> if the consumer was resumed or started, <tt>false</tt> if the consumer was already resumed or started
      */
     public boolean resumeOrStartConsumer(Consumer consumer) throws Exception {
-        if (consumer instanceof Suspendable) {
-            boolean resumed = ServiceHelper.resumeService(consumer);
-            if (resumed) {
-                log.debug("Resumed consumer {}", consumer);
-            } else {
-                log.trace("Consumer already resumed {}", consumer);
-            }
-            return resumed;
-        }
-        if (!ServiceHelper.isStarted(consumer)) {
-            ServiceHelper.startService(consumer);
-            log.debug("Started consumer {}", consumer);
-            return true;
+        boolean resumed = ServiceHelper.resumeService(consumer);
+        if (resumed) {
+            log.debug("Resumed consumer {}", consumer);
+        } else {
+            log.trace("Consumer already resumed {}", consumer);
         }
-        return false;
+        return resumed;
     }
 
     public void startRoute(Route route) throws Exception {
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java
similarity index 96%
rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java
index 84607e76..7bdb502 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.impl;
+package org.apache.camel.throttling;
 
 /**
  * Used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java
similarity index 99%
rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java
index 042559a..86f3dcb 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.impl;
+package org.apache.camel.throttling;
 
 import java.util.List;
 import java.util.Timer;
@@ -30,8 +30,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Route;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.support.RoutePolicySupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Modeled after the circuit breaker {@link ThrottlingInflightRoutePolicy}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java
similarity index 98%
rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java
index 4947c47..d662c50 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.impl;
+package org.apache.camel.throttling;
 
 import java.util.EventObject;
 import java.util.LinkedHashSet;
@@ -34,7 +34,6 @@ import org.apache.camel.support.EventNotifierSupport;
 import org.apache.camel.support.RoutePolicySupport;
 import org.apache.camel.support.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
-import org.slf4j.LoggerFactory;
 
 /**
  * A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable of dynamic
@@ -224,7 +223,7 @@ public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements
     }
 
     protected CamelLogger createLogger() {
-        return new CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), getLoggingLevel());
+        return new CamelLogger(log, getLoggingLevel());
     }
 
     private int getSize(Route route, Exchange exchange) {
diff --git a/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java b/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
index 5d61765..03c529e 100644
--- a/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
@@ -79,6 +79,11 @@ public class TestEndpointTest extends ContextTestSupport {
                 }
 
                 @Override
+                public Processor getProcessor() {
+                    return null;
+                }
+
+                @Override
                 public void start() throws Exception {
                     // when starting then send a message to the processor
                     Exchange exchange = new DefaultExchange(getEndpoint());
diff --git a/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java b/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java
index 318e1fc..d0897da 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java
@@ -24,13 +24,14 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.cluster.CamelClusterMember;
 import org.apache.camel.cluster.CamelClusterService;
 import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.cluster.ClusterServiceSelectors;
 import org.apache.camel.component.file.cluster.FileLockClusterService;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.camel.impl.cluster.ClusterServiceHelper.lookupService;
-import static org.apache.camel.impl.cluster.ClusterServiceHelper.mandatoryLookupService;
+import static org.apache.camel.cluster.ClusterServiceHelper.lookupService;
+import static org.apache.camel.cluster.ClusterServiceHelper.mandatoryLookupService;
 
 
 public class ClusterServiceSelectorTest {
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
index 222f3b6..aef3ea8 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
@@ -27,8 +27,8 @@ import org.apache.camel.Processor;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.api.management.mbean.ManagedThrottlingExceptionRoutePolicyMBean;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
 import org.junit.Test;
 
 public class ManagedThrottlingExceptionRoutePolicyTest  extends ManagementTestSupport {
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java
index 43141da..2cb25ae 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java
@@ -23,7 +23,7 @@ import javax.management.ObjectName;
 
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
 import org.junit.Test;
 
 public class ManagedThrottlingInflightRoutePolicyTest extends ManagementTestSupport {
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
index 44b89c5..fd66596 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -25,8 +25,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
 import org.apache.camel.support.ServiceSupport;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
index 24e2f5f..72da3aa 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -25,8 +25,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
 import org.apache.camel.support.ServiceSupport;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
index 49ff49a..9791f21 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
@@ -25,7 +25,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
 import org.apache.camel.support.ServiceSupport;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
index b650dbd..73f30a8 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
@@ -19,7 +19,7 @@ package org.apache.camel.processor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
index c096826..27fb6be 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
@@ -19,7 +19,7 @@ package org.apache.camel.processor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
index c944576..7a410bc 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -25,8 +25,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
 import org.apache.camel.support.ServiceSupport;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
index d3364e6..935a388 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
@@ -18,7 +18,7 @@ package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
 import org.junit.Test;
 
 public class ThrottlingInflightRoutePolicyTest extends ContextTestSupport {
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index d72e7ab..63afcea 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -59,6 +59,11 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
+    @Override
+    public AsyncProcessor getProcessor() {
+        return processor;
+    }
+
     public ExceptionHandler getExceptionHandler() {
         if (exceptionHandler == null) {
             exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java
index 0d01804..6b8e2de 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java
@@ -21,7 +21,7 @@ import javax.jms.ConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
diff --git a/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml b/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml
index d0dac16..ba982c5 100644
--- a/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml
+++ b/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml
@@ -49,7 +49,7 @@
         <property name="transactionManager" ref="jmsTransactionManager"/>
     </bean>
     
-    <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
+    <bean id="myPolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy">
         <property name="maxInflightExchanges" value="16"/>
         <property name="resumePercentOfMax" value="25"/>
     </bean>
diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
index a877b62..8f31a07 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
@@ -23,7 +23,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.cluster.CamelClusterService;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.impl.cluster.ClusterServiceHelper;
-import org.apache.camel.impl.cluster.ClusterServiceSelectors;
+import org.apache.camel.cluster.ClusterServiceSelectors;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringHelper;
diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java
index 21894ce..56294ac 100644
--- a/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java
+++ b/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java
@@ -23,7 +23,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.quartz2.QuartzComponent;
 import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
diff --git a/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml b/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml
index cbb9462..9792fbe 100644
--- a/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml
+++ b/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml
@@ -34,7 +34,7 @@
     <property name="routeStartRepeatInterval" value="3000"/>
   </bean>
 
-  <bean id="throttlePolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
+  <bean id="throttlePolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy">
     <property name="maxInflightExchanges" value="10"/>
   </bean>
 
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
index 728314c..2e95e92 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
@@ -26,7 +26,7 @@ import org.apache.camel.StatefulService;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import org.apache.camel.component.reactive.streams.support.TestSubscriber;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 import org.reactivestreams.Publisher;
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml
index cdb10c6..402baff 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml
@@ -26,7 +26,7 @@
 
     <!-- START SNIPPET: e1 -->
     <!-- configure our route policy to throttling based -->
-    <bean id="myRoutePolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
+    <bean id="myRoutePolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy">
         <!-- we want at most 10 concurrent inflight exchanges -->
         <property name="maxInflightExchanges" value="10"/>
         <!-- and we want a low water mark value of 20% of the max which means that
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
index 07da23d..122c002 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -29,7 +29,7 @@ import org.apache.camel.support.ServiceHelper;
 
 /**
  * Connector {@link Producer} which is capable of performing before and after custom processing
- * via the {@link Pipeline }while processing (ie sending the message).
+ * via the {@link Pipeline} while processing (ie sending the message).
  */
 public class ConnectorProducer extends DefaultAsyncProducer {
 
diff --git a/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml b/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml
index 5f1d890..34f08d7 100644
--- a/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml
+++ b/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml
@@ -42,7 +42,7 @@
   </bean>
 
   <!-- START SNIPPET: e1 -->
-  <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
+  <bean id="myPolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy">
     <!-- define the scope to be context scoped so we measure against total inflight exchanges
          that means for both route1, route2 and route3 all together -->
     <property name="scope" value="Context"/>