You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/26 16:50:16 UTC

svn commit: r1139800 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/management/ main/java/org/apache/camel/management/mbean/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/management/

Author: davsclaus
Date: Sun Jun 26 14:50:15 2011
New Revision: 1139800

URL: http://svn.apache.org/viewvc?rev=1139800&view=rev
Log:
CAMEL-4156: Introduced ManagmentObjectStrategy as a SPI to make it easier to reuse acutal mbeans used by Camel, and allow end users to plugin custom strategy if needed.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java Sun Jun 26 14:50:15 2011
@@ -40,37 +40,18 @@ import org.apache.camel.Route;
 import org.apache.camel.Service;
 import org.apache.camel.VetoCamelContextStartException;
 import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.component.bean.BeanProcessor;
 import org.apache.camel.impl.ConsumerCache;
 import org.apache.camel.impl.DefaultCamelContextNameStrategy;
 import org.apache.camel.impl.EndpointRegistry;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
 import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
 import org.apache.camel.impl.ProducerCache;
-import org.apache.camel.impl.ScheduledPollConsumer;
 import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
-import org.apache.camel.management.mbean.ManagedBeanProcessor;
-import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
-import org.apache.camel.management.mbean.ManagedCamelContext;
-import org.apache.camel.management.mbean.ManagedComponent;
-import org.apache.camel.management.mbean.ManagedConsumer;
 import org.apache.camel.management.mbean.ManagedConsumerCache;
-import org.apache.camel.management.mbean.ManagedDelayer;
 import org.apache.camel.management.mbean.ManagedEndpoint;
 import org.apache.camel.management.mbean.ManagedEndpointRegistry;
-import org.apache.camel.management.mbean.ManagedErrorHandler;
-import org.apache.camel.management.mbean.ManagedEventNotifier;
-import org.apache.camel.management.mbean.ManagedPerformanceCounter;
-import org.apache.camel.management.mbean.ManagedProcessor;
-import org.apache.camel.management.mbean.ManagedProducer;
 import org.apache.camel.management.mbean.ManagedProducerCache;
-import org.apache.camel.management.mbean.ManagedRoute;
-import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
-import org.apache.camel.management.mbean.ManagedSendProcessor;
 import org.apache.camel.management.mbean.ManagedService;
-import org.apache.camel.management.mbean.ManagedSuspendableRoute;
-import org.apache.camel.management.mbean.ManagedThreadPool;
-import org.apache.camel.management.mbean.ManagedThrottler;
 import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
 import org.apache.camel.management.mbean.ManagedTracer;
 import org.apache.camel.model.AOPDefinition;
@@ -80,19 +61,13 @@ import org.apache.camel.model.OnExceptio
 import org.apache.camel.model.PolicyDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.processor.Delayer;
-import org.apache.camel.processor.DelegateAsyncProcessor;
-import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.processor.ErrorHandler;
-import org.apache.camel.processor.SendProcessor;
-import org.apache.camel.processor.Throttler;
 import org.apache.camel.processor.interceptor.Tracer;
-import org.apache.camel.spi.BrowsableEndpoint;
 import org.apache.camel.spi.CamelContextNameStrategy;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.ManagementAgent;
 import org.apache.camel.spi.ManagementAware;
+import org.apache.camel.spi.ManagementObjectStrategy;
 import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.UnitOfWork;
@@ -134,8 +109,7 @@ public class DefaultManagementLifecycleS
     }
 
     public void onContextStart(CamelContext context) throws VetoCamelContextStartException {
-        ManagedCamelContext mc = new ManagedCamelContext(context);
-        mc.init(context.getManagementStrategy());
+        Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
 
         String managementName = context.getManagementName() != null ? context.getManagementName() : context.getName();
 
@@ -199,7 +173,7 @@ public class DefaultManagementLifecycleS
         initialized = true;
     }
 
-    private String findFreeName(ManagedCamelContext mc, CamelContextNameStrategy strategy, String managementName) throws MalformedObjectNameException {
+    private String findFreeName(Object mc, CamelContextNameStrategy strategy, String managementName) throws MalformedObjectNameException {
         boolean done = false;
         String name = null;
         // start from 2 as the existing name is considered the 1st
@@ -228,8 +202,7 @@ public class DefaultManagementLifecycleS
             return;
         }
         try {
-            ManagedCamelContext mc = new ManagedCamelContext(context);
-            mc.init(context.getManagementStrategy());
+            Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
             // the context could have been removed already
             if (getManagementStrategy().isManaged(mc, null)) {
                 getManagementStrategy().unmanageObject(mc);
@@ -245,7 +218,7 @@ public class DefaultManagementLifecycleS
             return;
         }
         try {
-            Object mc = getManagedObjectForComponent(name, component);
+            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);;
             getManagementStrategy().manageObject(mc);
         } catch (Exception e) {
             LOG.warn("Could not register Component MBean", e);
@@ -258,24 +231,13 @@ public class DefaultManagementLifecycleS
             return;
         }
         try {
-            Object mc = getManagedObjectForComponent(name, component);
+            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);;
             getManagementStrategy().unmanageObject(mc);
         } catch (Exception e) {
             LOG.warn("Could not unregister Component MBean", e);
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private Object getManagedObjectForComponent(String name, Component component) {
-        if (component instanceof ManagementAware) {
-            return ((ManagementAware) component).getManagedObject(component);
-        } else {
-            ManagedComponent mc = new ManagedComponent(name, component);
-            mc.init(getManagementStrategy());
-            return mc;
-        }
-    }
-
     /**
      * If the endpoint is an instance of ManagedResource then register it with the
      * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
@@ -290,12 +252,12 @@ public class DefaultManagementLifecycleS
         }
 
         try {
-            Object managedObject = getManagedObjectForEndpoint(endpoint);
-            if (managedObject == null) {
+            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
+            if (me == null) {
                 // endpoint should not be managed
                 return;
             }
-            getManagementStrategy().manageObject(managedObject);
+            getManagementStrategy().manageObject(me);
         } catch (Exception e) {
             LOG.warn("Could not register Endpoint MBean for uri: " + endpoint.getEndpointUri(), e);
         }
@@ -308,33 +270,13 @@ public class DefaultManagementLifecycleS
         }
 
         try {
-            Object me = getManagedObjectForEndpoint(endpoint);
+            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
             getManagementStrategy().unmanageObject(me);
         } catch (Exception e) {
             LOG.warn("Could not unregister Endpoint MBean for uri: " + endpoint.getEndpointUri(), e);
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private Object getManagedObjectForEndpoint(Endpoint endpoint) {
-        // we only want to manage singleton endpoints
-        if (!endpoint.isSingleton()) {
-            return null;
-        }
-
-        if (endpoint instanceof ManagementAware) {
-            return ((ManagementAware) endpoint).getManagedObject(endpoint);
-        } else if (endpoint instanceof BrowsableEndpoint) {
-            ManagedBrowsableEndpoint me = new ManagedBrowsableEndpoint((BrowsableEndpoint) endpoint);
-            me.init(getManagementStrategy());
-            return me;
-        } else {
-            ManagedEndpoint me = new ManagedEndpoint(endpoint);
-            me.init(getManagementStrategy());
-            return me;
-        }
-    }
-
     public void onServiceAdd(CamelContext context, Service service, Route route) {
         // services can by any kind of misc type but also processors
         // so we have special logic when its a processor
@@ -386,7 +328,7 @@ public class DefaultManagementLifecycleS
             return null;
         }
 
-        ManagedService answer = null;
+        Object answer = null;
 
         if (service instanceof ManagementAware) {
             return ((ManagementAware) service).getManagedObject(service);
@@ -396,18 +338,13 @@ public class DefaultManagementLifecycleS
             mt.init(getManagementStrategy());
             return mt;
         } else if (service instanceof EventNotifier) {
-            // special for event notifier
-            ManagedEventNotifier men = new ManagedEventNotifier(context, (EventNotifier) service);
-            men.init(getManagementStrategy());
-            return men;
+            answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
         } else if (service instanceof Producer) {
-            answer = new ManagedProducer(context, (Producer) service);
-        } else if (service instanceof ScheduledPollConsumer) {
-            answer = new ManagedScheduledPollConsumer(context, (ScheduledPollConsumer) service);
+            answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
         } else if (service instanceof Consumer) {
-            answer = new ManagedConsumer(context, (Consumer) service);
+            answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
         } else if (service instanceof Processor) {
-            // special for processors
+            // special for processors as we need to do some extra work
             return getManagedObjectForProcessor(context, (Processor) service, route);
         } else if (service instanceof ThrottlingInflightRoutePolicy) {
             answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
@@ -419,16 +356,16 @@ public class DefaultManagementLifecycleS
             answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service);
         } else if (service != null) {
             // fallback as generic service
-            answer = new ManagedService(context, service);
+            answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
         }
 
-        if (answer != null) {
-            answer.setRoute(route);
-            answer.init(getManagementStrategy());
+        if (answer != null && answer instanceof ManagedService) {
+            ManagedService ms = (ManagedService) answer;
+            ms.setRoute(route);
+            ms.init(getManagementStrategy());
             return answer;
         } else {
-            // not supported
-            return null;
+            return answer;
         }
     }
 
@@ -443,7 +380,7 @@ public class DefaultManagementLifecycleS
         }
 
         // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
-        Object managedObject = createManagedObjectForProcessor(context, processor, holder.getKey(), route);
+        Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
         // only manage if we have a name for it as otherwise we do not want to manage it anyway
         if (managedObject != null) {
             // is it a performance counter then we need to set our counter
@@ -451,7 +388,7 @@ public class DefaultManagementLifecycleS
                 InstrumentationProcessor counter = holder.getValue();
                 if (counter != null) {
                     // change counter to us
-                    counter.setCounter((ManagedPerformanceCounter) managedObject);
+                    counter.setCounter(managedObject);
                 }
             }
         }
@@ -459,59 +396,6 @@ public class DefaultManagementLifecycleS
         return managedObject;
     }
 
-    @SuppressWarnings("unchecked")
-    private Object createManagedObjectForProcessor(CamelContext context, Processor processor,
-                                                   ProcessorDefinition definition, Route route) {
-        ManagedProcessor answer = null;
-
-        // unwrap delegates as we want the real target processor
-        Processor target = processor;
-        while (target != null) {
-
-            // skip error handlers
-            if (target instanceof ErrorHandler) {
-                return false;
-            }
-
-            // look for specialized processor which we should prefer to use
-            if (target instanceof Delayer) {
-                answer = new ManagedDelayer(context, (Delayer) target, definition);
-            } else if (target instanceof Throttler) {
-                answer = new ManagedThrottler(context, (Throttler) target, definition);
-            } else if (target instanceof SendProcessor) {
-                answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
-            } else if (target instanceof BeanProcessor) {
-                answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
-            } else if (target instanceof ManagementAware) {
-                return ((ManagementAware) target).getManagedObject(processor);
-            }
-
-            if (answer != null) {
-                // break out as we found an answer
-                break;
-            }
-
-            // no answer yet, so unwrap any delegates and try again
-            if (target instanceof DelegateProcessor) {
-                target = ((DelegateProcessor) target).getProcessor();
-            } else if (target instanceof DelegateAsyncProcessor) {
-                target = ((DelegateAsyncProcessor) target).getProcessor();
-            } else {
-                // no delegate so we dont have any target to try next
-                break;
-            }
-        }
-
-        if (answer == null) {
-            // fallback to a generic processor
-            answer = new ManagedProcessor(context, target, definition);
-        }
-
-        answer.setRoute(route);
-        answer.init(getManagementStrategy());
-        return answer;
-    }
-
     public void onRoutesAdd(Collection<Route> routes) {
         for (Route route : routes) {
 
@@ -529,13 +413,7 @@ public class DefaultManagementLifecycleS
                 continue;
             }
 
-            ManagedRoute mr;
-            if (route.supportsSuspension()) {
-                mr = new ManagedSuspendableRoute(camelContext, route);
-            } else {
-                mr = new ManagedRoute(camelContext, route);
-            }
-            mr.init(getManagementStrategy());
+            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
 
             // skip already managed routes, for example if the route has been restarted
             if (getManagementStrategy().isManaged(mr, null)) {
@@ -571,8 +449,7 @@ public class DefaultManagementLifecycleS
         }
 
         for (Route route : routes) {
-            ManagedRoute mr = new ManagedRoute(camelContext, route);
-            mr.init(getManagementStrategy());
+            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
 
             // skip unmanaged routes
             if (!getManagementStrategy().isManaged(mr, null)) {
@@ -594,8 +471,7 @@ public class DefaultManagementLifecycleS
             return;
         }
 
-        ManagedErrorHandler me = new ManagedErrorHandler(routeContext, errorHandler, errorHandlerBuilder);
-        me.init(getManagementStrategy());
+        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
 
         // skip already managed services, for example if a route has been restarted
         if (getManagementStrategy().isManaged(me, null)) {
@@ -618,8 +494,7 @@ public class DefaultManagementLifecycleS
             return;
         }
 
-        ManagedThreadPool mtp = new ManagedThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
-        mtp.init(getManagementStrategy());
+        Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
 
         // skip already managed services, for example if a route has been restarted
         if (getManagementStrategy().isManaged(mtp, null)) {
@@ -723,9 +598,15 @@ public class DefaultManagementLifecycleS
     }
 
     private ManagementStrategy getManagementStrategy() {
+        ObjectHelper.notNull(camelContext, "CamelContext");
         return camelContext.getManagementStrategy();
     }
 
+    private ManagementObjectStrategy getManagementObjectStrategy() {
+        ObjectHelper.notNull(camelContext, "CamelContext");
+        return camelContext.getManagementStrategy().getManagementObjectStrategy();
+    }
+
     public void start() throws Exception {
         ObjectHelper.notNull(camelContext, "CamelContext");
     }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java?rev=1139800&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java Sun Jun 26 14:50:15 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.component.bean.BeanProcessor;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.management.mbean.ManagedBeanProcessor;
+import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
+import org.apache.camel.management.mbean.ManagedCamelContext;
+import org.apache.camel.management.mbean.ManagedComponent;
+import org.apache.camel.management.mbean.ManagedConsumer;
+import org.apache.camel.management.mbean.ManagedDelayer;
+import org.apache.camel.management.mbean.ManagedEndpoint;
+import org.apache.camel.management.mbean.ManagedErrorHandler;
+import org.apache.camel.management.mbean.ManagedEventNotifier;
+import org.apache.camel.management.mbean.ManagedProcessor;
+import org.apache.camel.management.mbean.ManagedProducer;
+import org.apache.camel.management.mbean.ManagedRoute;
+import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
+import org.apache.camel.management.mbean.ManagedSendProcessor;
+import org.apache.camel.management.mbean.ManagedService;
+import org.apache.camel.management.mbean.ManagedSuspendableRoute;
+import org.apache.camel.management.mbean.ManagedThreadPool;
+import org.apache.camel.management.mbean.ManagedThrottler;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.Delayer;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.ErrorHandler;
+import org.apache.camel.processor.SendProcessor;
+import org.apache.camel.processor.Throttler;
+import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.spi.EventNotifier;
+import org.apache.camel.spi.ManagementAware;
+import org.apache.camel.spi.ManagementObjectStrategy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ *
+ */
+public class DefaultManagementObjectStrategy implements ManagementObjectStrategy {
+
+    public Object getManagedObjectForCamelContext(CamelContext context) {
+        ManagedCamelContext mc = new ManagedCamelContext(context);
+        mc.init(context.getManagementStrategy());
+        return mc;
+    }
+
+    public Object getManagedObjectForComponent(CamelContext context, Component component, String name) {
+        if (component instanceof ManagementAware) {
+            return ((ManagementAware) component).getManagedObject(component);
+        } else {
+            ManagedComponent mc = new ManagedComponent(name, component);
+            mc.init(context.getManagementStrategy());
+            return mc;
+        }
+    }
+
+    public Object getManagedObjectForEndpoint(CamelContext context, Endpoint endpoint) {
+        // we only want to manage singleton endpoints
+        if (!endpoint.isSingleton()) {
+            return null;
+        }
+
+        if (endpoint instanceof ManagementAware) {
+            return ((ManagementAware) endpoint).getManagedObject(endpoint);
+        } else if (endpoint instanceof BrowsableEndpoint) {
+            ManagedBrowsableEndpoint me = new ManagedBrowsableEndpoint((BrowsableEndpoint) endpoint);
+            me.init(context.getManagementStrategy());
+            return me;
+        } else {
+            ManagedEndpoint me = new ManagedEndpoint(endpoint);
+            me.init(context.getManagementStrategy());
+            return me;
+        }
+    }
+
+    public Object getManagedObjectForErrorHandler(CamelContext context, RouteContext routeContext,
+                                                  Processor errorHandler, ErrorHandlerBuilder errorHandlerBuilder) {
+        ManagedErrorHandler me = new ManagedErrorHandler(routeContext, errorHandler, errorHandlerBuilder);
+        me.init(context.getManagementStrategy());
+        return me;
+    }
+
+    public Object getManagedObjectForRoute(CamelContext context, Route route) {
+        ManagedRoute mr;
+        if (route.supportsSuspension()) {
+            mr = new ManagedSuspendableRoute(context, route);
+        } else {
+            mr = new ManagedRoute(context, route);
+        }
+        mr.init(context.getManagementStrategy());
+        return mr;
+    }
+
+    public Object getManagedObjectForThreadPool(CamelContext context, ThreadPoolExecutor threadPool,
+                                                String id, String sourceId, String routeId, String threadPoolProfileId) {
+        ManagedThreadPool mtp = new ManagedThreadPool(context, threadPool, id, sourceId, routeId, threadPoolProfileId);
+        mtp.init(context.getManagementStrategy());
+        return mtp;
+    }
+
+    public Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier) {
+        ManagedEventNotifier men = new ManagedEventNotifier(context, eventNotifier);
+        men.init(context.getManagementStrategy());
+        return men;
+    }
+
+    public Object getManagedObjectForConsumer(CamelContext context, Consumer consumer) {
+        ManagedConsumer mc;
+        if (consumer instanceof ScheduledPollConsumer) {
+            mc = new ManagedScheduledPollConsumer(context, (ScheduledPollConsumer) consumer);
+        } else {
+            mc = new ManagedConsumer(context, consumer);
+        }
+        mc.init(context.getManagementStrategy());
+        return mc;
+    }
+
+    public Object getManagedObjectForProducer(CamelContext context, Producer producer) {
+        ManagedProducer mp = new ManagedProducer(context, producer);
+        mp.init(context.getManagementStrategy());
+        return mp;
+    }
+
+    public Object getManagedObjectForService(CamelContext context, Service service) {
+        ManagedService mc = new ManagedService(context, service);
+        mc.init(context.getManagementStrategy());
+        return mc;
+    }
+
+    public Object getManagedObjectForProcessor(CamelContext context, Processor processor,
+                                               ProcessorDefinition definition, Route route) {
+        ManagedProcessor answer = null;
+
+        // unwrap delegates as we want the real target processor
+        Processor target = processor;
+        while (target != null) {
+
+            // skip error handlers
+            if (target instanceof ErrorHandler) {
+                return false;
+            }
+
+            // look for specialized processor which we should prefer to use
+            if (target instanceof Delayer) {
+                answer = new ManagedDelayer(context, (Delayer) target, definition);
+            } else if (target instanceof Throttler) {
+                answer = new ManagedThrottler(context, (Throttler) target, definition);
+            } else if (target instanceof SendProcessor) {
+                answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
+            } else if (target instanceof BeanProcessor) {
+                answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
+            } else if (target instanceof ManagementAware) {
+                return ((ManagementAware) target).getManagedObject(processor);
+            }
+
+            if (answer != null) {
+                // break out as we found an answer
+                break;
+            }
+
+            // no answer yet, so unwrap any delegates and try again
+            if (target instanceof DelegateProcessor) {
+                target = ((DelegateProcessor) target).getProcessor();
+            } else if (target instanceof DelegateAsyncProcessor) {
+                target = ((DelegateAsyncProcessor) target).getProcessor();
+            } else {
+                // no delegate so we dont have any target to try next
+                break;
+            }
+        }
+
+        if (answer == null) {
+            // fallback to a generic processor
+            answer = new ManagedProcessor(context, target, definition);
+        }
+
+        answer.setRoute(route);
+        answer.init(context.getManagementStrategy());
+        return answer;
+    }
+
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java Sun Jun 26 14:50:15 2011
@@ -28,6 +28,7 @@ import org.apache.camel.spi.EventFactory
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.ManagementAgent;
 import org.apache.camel.spi.ManagementNamingStrategy;
+import org.apache.camel.spi.ManagementObjectStrategy;
 import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.util.ServiceHelper;
 import org.fusesource.commons.management.Statistic;
@@ -50,6 +51,7 @@ public class DefaultManagementStrategy i
     private List<EventNotifier> eventNotifiers = new ArrayList<EventNotifier>();
     private EventFactory eventFactory = new DefaultEventFactory();
     private ManagementNamingStrategy managementNamingStrategy;
+    private ManagementObjectStrategy managementObjectStrategy;
     private boolean onlyManageProcessorWithCustomId;
     private ManagementAgent managementAgent;
     private ManagementStatisticsLevel statisticsLevel = ManagementStatisticsLevel.All;
@@ -90,6 +92,17 @@ public class DefaultManagementStrategy i
         this.managementNamingStrategy = managementNamingStrategy;
     }
 
+    public ManagementObjectStrategy getManagementObjectStrategy() {
+        if (managementObjectStrategy == null) {
+            managementObjectStrategy = new DefaultManagementObjectStrategy();
+        }
+        return managementObjectStrategy;
+    }
+
+    public void setManagementObjectStrategy(ManagementObjectStrategy managementObjectStrategy) {
+        this.managementObjectStrategy = managementObjectStrategy;
+    }
+
     public ManagementAgent getManagementAgent() {
         return managementAgent;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java Sun Jun 26 14:50:15 2011
@@ -48,11 +48,16 @@ public class InstrumentationProcessor ex
         return "Instrumentation" + (type != null ? ":" + type : "") + "[" + processor + "]";
     }
 
-    public void setCounter(ManagedPerformanceCounter counter) {
+    public void setCounter(Object counter) {
+        ManagedPerformanceCounter mpc = null;
+        if (counter instanceof ManagedPerformanceCounter) {
+            mpc = (ManagedPerformanceCounter) counter;
+        }
+
         if (this.counter instanceof DelegatePerformanceCounter) {
-            ((DelegatePerformanceCounter) this.counter).setCounter(counter);
+            ((DelegatePerformanceCounter) this.counter).setCounter(mpc);
         } else {
-            this.counter = counter;
+            this.counter = mpc;
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java Sun Jun 26 14:50:15 2011
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import javax.management.ObjectName;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -253,9 +254,21 @@ public class ManagedCamelContext {
             // endpoint already exists
             return false;
         }
-        // create new endpoint by just getting it
-        context.getEndpoint(uri);
-        return true;
+
+        Endpoint endpoint = context.getEndpoint(uri);
+        if (endpoint != null) {
+            // ensure endpoint is registered, as the management strategy could have been configured to not always
+            // register new endpoints in JMX, so we need to check if its registered, and if not register it manually
+            ObjectName on = context.getManagementStrategy().getManagementNamingStrategy().getObjectNameForEndpoint(endpoint);
+            if (on != null && !context.getManagementStrategy().getManagementAgent().isRegistered(on)) {
+                // register endpoint as mbean
+                Object me = context.getManagementStrategy().getManagementObjectStrategy().getManagedObjectForEndpoint(context, endpoint);
+                context.getManagementStrategy().getManagementAgent().register(me, on);
+            }
+            return true;
+        } else {
+            return false;
+        }
     }
 
     /**
@@ -268,6 +281,7 @@ public class ManagedCamelContext {
      */
     @ManagedOperation(description = "Removes endpoints by the given pattern")
     public int removeEndpoints(String pattern) throws Exception {
+        // endpoints is always removed from JMX if removed from context
         Collection<Endpoint> removed = context.removeEndpoints(pattern);
         if (removed == null) {
             return 0;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java Sun Jun 26 14:50:15 2011
@@ -230,6 +230,8 @@ public interface ManagementAgent extends
 
     /**
      * Whether to register mbeans when starting a new route
+     * <p/>
+     * This option is default <tt>true</tt>.
      *
      * @return <tt>true</tt> to register when starting a new route
      */
@@ -237,6 +239,8 @@ public interface ManagementAgent extends
 
     /**
      * Whether to register mbeans when starting a new route
+     * <p/>
+     * This option is default <tt>true</tt>.
      *
      * @param registerNewRoutes <tt>true</tt> to register when starting a new route
      */

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java?rev=1139800&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java Sun Jun 26 14:50:15 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.model.ProcessorDefinition;
+
+/**
+ * Strategy for creating the managed object for the various beans Camel register for management.
+ */
+public interface ManagementObjectStrategy {
+    
+    Object getManagedObjectForCamelContext(CamelContext context);
+
+    Object getManagedObjectForComponent(CamelContext context, Component component, String name);
+
+    Object getManagedObjectForEndpoint(CamelContext context, Endpoint endpoint);
+
+    Object getManagedObjectForErrorHandler(CamelContext context, RouteContext routeContext,
+                                           Processor errorHandler, ErrorHandlerBuilder errorHandlerBuilder);
+
+    Object getManagedObjectForRoute(CamelContext context, Route route);
+
+    Object getManagedObjectForConsumer(CamelContext context, Consumer consumer);
+
+    Object getManagedObjectForProducer(CamelContext context, Producer producer);
+
+    Object getManagedObjectForProcessor(CamelContext context, Processor processor,
+                                        ProcessorDefinition definition, Route route);
+
+    Object getManagedObjectForService(CamelContext context, Service service);
+
+    Object getManagedObjectForThreadPool(CamelContext context, ThreadPoolExecutor threadPool,
+                                         String id, String sourceId, String routeId, String threadPoolProfileId);
+
+    Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier);
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java Sun Jun 26 14:50:15 2011
@@ -96,6 +96,20 @@ public interface ManagementStrategy exte
     void setManagementNamingStrategy(ManagementNamingStrategy strategy);
 
     /**
+     * Gets the object strategy to use
+     *
+     * @return object strategy
+     */
+    ManagementObjectStrategy getManagementObjectStrategy();
+
+    /**
+     * Sets the object strategy to use
+     *
+     * @param strategy object strategy
+     */
+    void setManagementObjectStrategy(ManagementObjectStrategy strategy);
+
+    /**
      * Gets the management agent
      *
      * @return management agent

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java Sun Jun 26 14:50:15 2011
@@ -95,9 +95,16 @@ public class ManagedCamelContextTest ext
 
         assertNotNull(context.hasEndpoint("seda:bar"));
 
+        ObjectName seda = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"seda://bar\"");
+        boolean registered = mbeanServer.isRegistered(seda);
+        assertTrue("Should be registered " + seda, registered);
+
         // create it again
         reply = mbeanServer.invoke(on, "createEndpoint", new Object[]{"seda:bar"}, new String[]{"java.lang.String"});
         assertEquals(Boolean.FALSE, reply);
+
+        registered = mbeanServer.isRegistered(seda);
+        assertTrue("Should be registered " + seda, registered);
     }
 
     public void testManagedCamelContextRemoveEndpoint() throws Exception {
@@ -113,17 +120,25 @@ public class ManagedCamelContextTest ext
 
         assertNotNull(context.hasEndpoint("seda:bar"));
 
+        ObjectName seda = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"seda://bar\"");
+        boolean registered = mbeanServer.isRegistered(seda);
+        assertTrue("Should be registered " + seda, registered);
+
         // remove it
         Object num = mbeanServer.invoke(on, "removeEndpoints", new Object[]{"seda:*"}, new String[]{"java.lang.String"});
         assertEquals(1, num);
 
         assertNull(context.hasEndpoint("seda:bar"));
+        registered = mbeanServer.isRegistered(seda);
+        assertFalse("Should not be registered " + seda, registered);
 
         // remove it again
         num = mbeanServer.invoke(on, "removeEndpoints", new Object[]{"seda:*"}, new String[]{"java.lang.String"});
         assertEquals(0, num);
 
         assertNull(context.hasEndpoint("seda:bar"));
+        registered = mbeanServer.isRegistered(seda);
+        assertFalse("Should not be registered " + seda, registered);
     }
 
     @Override