You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/26 16:50:16 UTC
svn commit: r1139800 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/management/
main/java/org/apache/camel/management/mbean/
main/java/org/apache/camel/spi/ test/java/org/apache/camel/management/
Author: davsclaus
Date: Sun Jun 26 14:50:15 2011
New Revision: 1139800
URL: http://svn.apache.org/viewvc?rev=1139800&view=rev
Log:
CAMEL-4156: Introduced ManagmentObjectStrategy as a SPI to make it easier to reuse acutal mbeans used by Camel, and allow end users to plugin custom strategy if needed.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java Sun Jun 26 14:50:15 2011
@@ -40,37 +40,18 @@ import org.apache.camel.Route;
import org.apache.camel.Service;
import org.apache.camel.VetoCamelContextStartException;
import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.component.bean.BeanProcessor;
import org.apache.camel.impl.ConsumerCache;
import org.apache.camel.impl.DefaultCamelContextNameStrategy;
import org.apache.camel.impl.EndpointRegistry;
import org.apache.camel.impl.EventDrivenConsumerRoute;
import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
import org.apache.camel.impl.ProducerCache;
-import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
-import org.apache.camel.management.mbean.ManagedBeanProcessor;
-import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
-import org.apache.camel.management.mbean.ManagedCamelContext;
-import org.apache.camel.management.mbean.ManagedComponent;
-import org.apache.camel.management.mbean.ManagedConsumer;
import org.apache.camel.management.mbean.ManagedConsumerCache;
-import org.apache.camel.management.mbean.ManagedDelayer;
import org.apache.camel.management.mbean.ManagedEndpoint;
import org.apache.camel.management.mbean.ManagedEndpointRegistry;
-import org.apache.camel.management.mbean.ManagedErrorHandler;
-import org.apache.camel.management.mbean.ManagedEventNotifier;
-import org.apache.camel.management.mbean.ManagedPerformanceCounter;
-import org.apache.camel.management.mbean.ManagedProcessor;
-import org.apache.camel.management.mbean.ManagedProducer;
import org.apache.camel.management.mbean.ManagedProducerCache;
-import org.apache.camel.management.mbean.ManagedRoute;
-import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
-import org.apache.camel.management.mbean.ManagedSendProcessor;
import org.apache.camel.management.mbean.ManagedService;
-import org.apache.camel.management.mbean.ManagedSuspendableRoute;
-import org.apache.camel.management.mbean.ManagedThreadPool;
-import org.apache.camel.management.mbean.ManagedThrottler;
import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
import org.apache.camel.management.mbean.ManagedTracer;
import org.apache.camel.model.AOPDefinition;
@@ -80,19 +61,13 @@ import org.apache.camel.model.OnExceptio
import org.apache.camel.model.PolicyDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.processor.Delayer;
-import org.apache.camel.processor.DelegateAsyncProcessor;
-import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.processor.ErrorHandler;
-import org.apache.camel.processor.SendProcessor;
-import org.apache.camel.processor.Throttler;
import org.apache.camel.processor.interceptor.Tracer;
-import org.apache.camel.spi.BrowsableEndpoint;
import org.apache.camel.spi.CamelContextNameStrategy;
import org.apache.camel.spi.EventNotifier;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.ManagementAgent;
import org.apache.camel.spi.ManagementAware;
+import org.apache.camel.spi.ManagementObjectStrategy;
import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.UnitOfWork;
@@ -134,8 +109,7 @@ public class DefaultManagementLifecycleS
}
public void onContextStart(CamelContext context) throws VetoCamelContextStartException {
- ManagedCamelContext mc = new ManagedCamelContext(context);
- mc.init(context.getManagementStrategy());
+ Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
String managementName = context.getManagementName() != null ? context.getManagementName() : context.getName();
@@ -199,7 +173,7 @@ public class DefaultManagementLifecycleS
initialized = true;
}
- private String findFreeName(ManagedCamelContext mc, CamelContextNameStrategy strategy, String managementName) throws MalformedObjectNameException {
+ private String findFreeName(Object mc, CamelContextNameStrategy strategy, String managementName) throws MalformedObjectNameException {
boolean done = false;
String name = null;
// start from 2 as the existing name is considered the 1st
@@ -228,8 +202,7 @@ public class DefaultManagementLifecycleS
return;
}
try {
- ManagedCamelContext mc = new ManagedCamelContext(context);
- mc.init(context.getManagementStrategy());
+ Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
// the context could have been removed already
if (getManagementStrategy().isManaged(mc, null)) {
getManagementStrategy().unmanageObject(mc);
@@ -245,7 +218,7 @@ public class DefaultManagementLifecycleS
return;
}
try {
- Object mc = getManagedObjectForComponent(name, component);
+ Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);;
getManagementStrategy().manageObject(mc);
} catch (Exception e) {
LOG.warn("Could not register Component MBean", e);
@@ -258,24 +231,13 @@ public class DefaultManagementLifecycleS
return;
}
try {
- Object mc = getManagedObjectForComponent(name, component);
+ Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);;
getManagementStrategy().unmanageObject(mc);
} catch (Exception e) {
LOG.warn("Could not unregister Component MBean", e);
}
}
- @SuppressWarnings("unchecked")
- private Object getManagedObjectForComponent(String name, Component component) {
- if (component instanceof ManagementAware) {
- return ((ManagementAware) component).getManagedObject(component);
- } else {
- ManagedComponent mc = new ManagedComponent(name, component);
- mc.init(getManagementStrategy());
- return mc;
- }
- }
-
/**
* If the endpoint is an instance of ManagedResource then register it with the
* mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
@@ -290,12 +252,12 @@ public class DefaultManagementLifecycleS
}
try {
- Object managedObject = getManagedObjectForEndpoint(endpoint);
- if (managedObject == null) {
+ Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
+ if (me == null) {
// endpoint should not be managed
return;
}
- getManagementStrategy().manageObject(managedObject);
+ getManagementStrategy().manageObject(me);
} catch (Exception e) {
LOG.warn("Could not register Endpoint MBean for uri: " + endpoint.getEndpointUri(), e);
}
@@ -308,33 +270,13 @@ public class DefaultManagementLifecycleS
}
try {
- Object me = getManagedObjectForEndpoint(endpoint);
+ Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
getManagementStrategy().unmanageObject(me);
} catch (Exception e) {
LOG.warn("Could not unregister Endpoint MBean for uri: " + endpoint.getEndpointUri(), e);
}
}
- @SuppressWarnings("unchecked")
- private Object getManagedObjectForEndpoint(Endpoint endpoint) {
- // we only want to manage singleton endpoints
- if (!endpoint.isSingleton()) {
- return null;
- }
-
- if (endpoint instanceof ManagementAware) {
- return ((ManagementAware) endpoint).getManagedObject(endpoint);
- } else if (endpoint instanceof BrowsableEndpoint) {
- ManagedBrowsableEndpoint me = new ManagedBrowsableEndpoint((BrowsableEndpoint) endpoint);
- me.init(getManagementStrategy());
- return me;
- } else {
- ManagedEndpoint me = new ManagedEndpoint(endpoint);
- me.init(getManagementStrategy());
- return me;
- }
- }
-
public void onServiceAdd(CamelContext context, Service service, Route route) {
// services can by any kind of misc type but also processors
// so we have special logic when its a processor
@@ -386,7 +328,7 @@ public class DefaultManagementLifecycleS
return null;
}
- ManagedService answer = null;
+ Object answer = null;
if (service instanceof ManagementAware) {
return ((ManagementAware) service).getManagedObject(service);
@@ -396,18 +338,13 @@ public class DefaultManagementLifecycleS
mt.init(getManagementStrategy());
return mt;
} else if (service instanceof EventNotifier) {
- // special for event notifier
- ManagedEventNotifier men = new ManagedEventNotifier(context, (EventNotifier) service);
- men.init(getManagementStrategy());
- return men;
+ answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
} else if (service instanceof Producer) {
- answer = new ManagedProducer(context, (Producer) service);
- } else if (service instanceof ScheduledPollConsumer) {
- answer = new ManagedScheduledPollConsumer(context, (ScheduledPollConsumer) service);
+ answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
} else if (service instanceof Consumer) {
- answer = new ManagedConsumer(context, (Consumer) service);
+ answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
} else if (service instanceof Processor) {
- // special for processors
+ // special for processors as we need to do some extra work
return getManagedObjectForProcessor(context, (Processor) service, route);
} else if (service instanceof ThrottlingInflightRoutePolicy) {
answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
@@ -419,16 +356,16 @@ public class DefaultManagementLifecycleS
answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service);
} else if (service != null) {
// fallback as generic service
- answer = new ManagedService(context, service);
+ answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
}
- if (answer != null) {
- answer.setRoute(route);
- answer.init(getManagementStrategy());
+ if (answer != null && answer instanceof ManagedService) {
+ ManagedService ms = (ManagedService) answer;
+ ms.setRoute(route);
+ ms.init(getManagementStrategy());
return answer;
} else {
- // not supported
- return null;
+ return answer;
}
}
@@ -443,7 +380,7 @@ public class DefaultManagementLifecycleS
}
// get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
- Object managedObject = createManagedObjectForProcessor(context, processor, holder.getKey(), route);
+ Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
// only manage if we have a name for it as otherwise we do not want to manage it anyway
if (managedObject != null) {
// is it a performance counter then we need to set our counter
@@ -451,7 +388,7 @@ public class DefaultManagementLifecycleS
InstrumentationProcessor counter = holder.getValue();
if (counter != null) {
// change counter to us
- counter.setCounter((ManagedPerformanceCounter) managedObject);
+ counter.setCounter(managedObject);
}
}
}
@@ -459,59 +396,6 @@ public class DefaultManagementLifecycleS
return managedObject;
}
- @SuppressWarnings("unchecked")
- private Object createManagedObjectForProcessor(CamelContext context, Processor processor,
- ProcessorDefinition definition, Route route) {
- ManagedProcessor answer = null;
-
- // unwrap delegates as we want the real target processor
- Processor target = processor;
- while (target != null) {
-
- // skip error handlers
- if (target instanceof ErrorHandler) {
- return false;
- }
-
- // look for specialized processor which we should prefer to use
- if (target instanceof Delayer) {
- answer = new ManagedDelayer(context, (Delayer) target, definition);
- } else if (target instanceof Throttler) {
- answer = new ManagedThrottler(context, (Throttler) target, definition);
- } else if (target instanceof SendProcessor) {
- answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
- } else if (target instanceof BeanProcessor) {
- answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
- } else if (target instanceof ManagementAware) {
- return ((ManagementAware) target).getManagedObject(processor);
- }
-
- if (answer != null) {
- // break out as we found an answer
- break;
- }
-
- // no answer yet, so unwrap any delegates and try again
- if (target instanceof DelegateProcessor) {
- target = ((DelegateProcessor) target).getProcessor();
- } else if (target instanceof DelegateAsyncProcessor) {
- target = ((DelegateAsyncProcessor) target).getProcessor();
- } else {
- // no delegate so we dont have any target to try next
- break;
- }
- }
-
- if (answer == null) {
- // fallback to a generic processor
- answer = new ManagedProcessor(context, target, definition);
- }
-
- answer.setRoute(route);
- answer.init(getManagementStrategy());
- return answer;
- }
-
public void onRoutesAdd(Collection<Route> routes) {
for (Route route : routes) {
@@ -529,13 +413,7 @@ public class DefaultManagementLifecycleS
continue;
}
- ManagedRoute mr;
- if (route.supportsSuspension()) {
- mr = new ManagedSuspendableRoute(camelContext, route);
- } else {
- mr = new ManagedRoute(camelContext, route);
- }
- mr.init(getManagementStrategy());
+ Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
// skip already managed routes, for example if the route has been restarted
if (getManagementStrategy().isManaged(mr, null)) {
@@ -571,8 +449,7 @@ public class DefaultManagementLifecycleS
}
for (Route route : routes) {
- ManagedRoute mr = new ManagedRoute(camelContext, route);
- mr.init(getManagementStrategy());
+ Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
// skip unmanaged routes
if (!getManagementStrategy().isManaged(mr, null)) {
@@ -594,8 +471,7 @@ public class DefaultManagementLifecycleS
return;
}
- ManagedErrorHandler me = new ManagedErrorHandler(routeContext, errorHandler, errorHandlerBuilder);
- me.init(getManagementStrategy());
+ Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
// skip already managed services, for example if a route has been restarted
if (getManagementStrategy().isManaged(me, null)) {
@@ -618,8 +494,7 @@ public class DefaultManagementLifecycleS
return;
}
- ManagedThreadPool mtp = new ManagedThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
- mtp.init(getManagementStrategy());
+ Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
// skip already managed services, for example if a route has been restarted
if (getManagementStrategy().isManaged(mtp, null)) {
@@ -723,9 +598,15 @@ public class DefaultManagementLifecycleS
}
private ManagementStrategy getManagementStrategy() {
+ ObjectHelper.notNull(camelContext, "CamelContext");
return camelContext.getManagementStrategy();
}
+ private ManagementObjectStrategy getManagementObjectStrategy() {
+ ObjectHelper.notNull(camelContext, "CamelContext");
+ return camelContext.getManagementStrategy().getManagementObjectStrategy();
+ }
+
public void start() throws Exception {
ObjectHelper.notNull(camelContext, "CamelContext");
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java?rev=1139800&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java Sun Jun 26 14:50:15 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.component.bean.BeanProcessor;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.management.mbean.ManagedBeanProcessor;
+import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
+import org.apache.camel.management.mbean.ManagedCamelContext;
+import org.apache.camel.management.mbean.ManagedComponent;
+import org.apache.camel.management.mbean.ManagedConsumer;
+import org.apache.camel.management.mbean.ManagedDelayer;
+import org.apache.camel.management.mbean.ManagedEndpoint;
+import org.apache.camel.management.mbean.ManagedErrorHandler;
+import org.apache.camel.management.mbean.ManagedEventNotifier;
+import org.apache.camel.management.mbean.ManagedProcessor;
+import org.apache.camel.management.mbean.ManagedProducer;
+import org.apache.camel.management.mbean.ManagedRoute;
+import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
+import org.apache.camel.management.mbean.ManagedSendProcessor;
+import org.apache.camel.management.mbean.ManagedService;
+import org.apache.camel.management.mbean.ManagedSuspendableRoute;
+import org.apache.camel.management.mbean.ManagedThreadPool;
+import org.apache.camel.management.mbean.ManagedThrottler;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.Delayer;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.ErrorHandler;
+import org.apache.camel.processor.SendProcessor;
+import org.apache.camel.processor.Throttler;
+import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.spi.EventNotifier;
+import org.apache.camel.spi.ManagementAware;
+import org.apache.camel.spi.ManagementObjectStrategy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ *
+ */
+public class DefaultManagementObjectStrategy implements ManagementObjectStrategy {
+
+ public Object getManagedObjectForCamelContext(CamelContext context) {
+ ManagedCamelContext mc = new ManagedCamelContext(context);
+ mc.init(context.getManagementStrategy());
+ return mc;
+ }
+
+ public Object getManagedObjectForComponent(CamelContext context, Component component, String name) {
+ if (component instanceof ManagementAware) {
+ return ((ManagementAware) component).getManagedObject(component);
+ } else {
+ ManagedComponent mc = new ManagedComponent(name, component);
+ mc.init(context.getManagementStrategy());
+ return mc;
+ }
+ }
+
+ public Object getManagedObjectForEndpoint(CamelContext context, Endpoint endpoint) {
+ // we only want to manage singleton endpoints
+ if (!endpoint.isSingleton()) {
+ return null;
+ }
+
+ if (endpoint instanceof ManagementAware) {
+ return ((ManagementAware) endpoint).getManagedObject(endpoint);
+ } else if (endpoint instanceof BrowsableEndpoint) {
+ ManagedBrowsableEndpoint me = new ManagedBrowsableEndpoint((BrowsableEndpoint) endpoint);
+ me.init(context.getManagementStrategy());
+ return me;
+ } else {
+ ManagedEndpoint me = new ManagedEndpoint(endpoint);
+ me.init(context.getManagementStrategy());
+ return me;
+ }
+ }
+
+ public Object getManagedObjectForErrorHandler(CamelContext context, RouteContext routeContext,
+ Processor errorHandler, ErrorHandlerBuilder errorHandlerBuilder) {
+ ManagedErrorHandler me = new ManagedErrorHandler(routeContext, errorHandler, errorHandlerBuilder);
+ me.init(context.getManagementStrategy());
+ return me;
+ }
+
+ public Object getManagedObjectForRoute(CamelContext context, Route route) {
+ ManagedRoute mr;
+ if (route.supportsSuspension()) {
+ mr = new ManagedSuspendableRoute(context, route);
+ } else {
+ mr = new ManagedRoute(context, route);
+ }
+ mr.init(context.getManagementStrategy());
+ return mr;
+ }
+
+ public Object getManagedObjectForThreadPool(CamelContext context, ThreadPoolExecutor threadPool,
+ String id, String sourceId, String routeId, String threadPoolProfileId) {
+ ManagedThreadPool mtp = new ManagedThreadPool(context, threadPool, id, sourceId, routeId, threadPoolProfileId);
+ mtp.init(context.getManagementStrategy());
+ return mtp;
+ }
+
+ public Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier) {
+ ManagedEventNotifier men = new ManagedEventNotifier(context, eventNotifier);
+ men.init(context.getManagementStrategy());
+ return men;
+ }
+
+ public Object getManagedObjectForConsumer(CamelContext context, Consumer consumer) {
+ ManagedConsumer mc;
+ if (consumer instanceof ScheduledPollConsumer) {
+ mc = new ManagedScheduledPollConsumer(context, (ScheduledPollConsumer) consumer);
+ } else {
+ mc = new ManagedConsumer(context, consumer);
+ }
+ mc.init(context.getManagementStrategy());
+ return mc;
+ }
+
+ public Object getManagedObjectForProducer(CamelContext context, Producer producer) {
+ ManagedProducer mp = new ManagedProducer(context, producer);
+ mp.init(context.getManagementStrategy());
+ return mp;
+ }
+
+ public Object getManagedObjectForService(CamelContext context, Service service) {
+ ManagedService mc = new ManagedService(context, service);
+ mc.init(context.getManagementStrategy());
+ return mc;
+ }
+
+ public Object getManagedObjectForProcessor(CamelContext context, Processor processor,
+ ProcessorDefinition definition, Route route) {
+ ManagedProcessor answer = null;
+
+ // unwrap delegates as we want the real target processor
+ Processor target = processor;
+ while (target != null) {
+
+ // skip error handlers
+ if (target instanceof ErrorHandler) {
+ return false;
+ }
+
+ // look for specialized processor which we should prefer to use
+ if (target instanceof Delayer) {
+ answer = new ManagedDelayer(context, (Delayer) target, definition);
+ } else if (target instanceof Throttler) {
+ answer = new ManagedThrottler(context, (Throttler) target, definition);
+ } else if (target instanceof SendProcessor) {
+ answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
+ } else if (target instanceof BeanProcessor) {
+ answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
+ } else if (target instanceof ManagementAware) {
+ return ((ManagementAware) target).getManagedObject(processor);
+ }
+
+ if (answer != null) {
+ // break out as we found an answer
+ break;
+ }
+
+ // no answer yet, so unwrap any delegates and try again
+ if (target instanceof DelegateProcessor) {
+ target = ((DelegateProcessor) target).getProcessor();
+ } else if (target instanceof DelegateAsyncProcessor) {
+ target = ((DelegateAsyncProcessor) target).getProcessor();
+ } else {
+ // no delegate so we dont have any target to try next
+ break;
+ }
+ }
+
+ if (answer == null) {
+ // fallback to a generic processor
+ answer = new ManagedProcessor(context, target, definition);
+ }
+
+ answer.setRoute(route);
+ answer.init(context.getManagementStrategy());
+ return answer;
+ }
+
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java Sun Jun 26 14:50:15 2011
@@ -28,6 +28,7 @@ import org.apache.camel.spi.EventFactory
import org.apache.camel.spi.EventNotifier;
import org.apache.camel.spi.ManagementAgent;
import org.apache.camel.spi.ManagementNamingStrategy;
+import org.apache.camel.spi.ManagementObjectStrategy;
import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.util.ServiceHelper;
import org.fusesource.commons.management.Statistic;
@@ -50,6 +51,7 @@ public class DefaultManagementStrategy i
private List<EventNotifier> eventNotifiers = new ArrayList<EventNotifier>();
private EventFactory eventFactory = new DefaultEventFactory();
private ManagementNamingStrategy managementNamingStrategy;
+ private ManagementObjectStrategy managementObjectStrategy;
private boolean onlyManageProcessorWithCustomId;
private ManagementAgent managementAgent;
private ManagementStatisticsLevel statisticsLevel = ManagementStatisticsLevel.All;
@@ -90,6 +92,17 @@ public class DefaultManagementStrategy i
this.managementNamingStrategy = managementNamingStrategy;
}
+ public ManagementObjectStrategy getManagementObjectStrategy() {
+ if (managementObjectStrategy == null) {
+ managementObjectStrategy = new DefaultManagementObjectStrategy();
+ }
+ return managementObjectStrategy;
+ }
+
+ public void setManagementObjectStrategy(ManagementObjectStrategy managementObjectStrategy) {
+ this.managementObjectStrategy = managementObjectStrategy;
+ }
+
public ManagementAgent getManagementAgent() {
return managementAgent;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java Sun Jun 26 14:50:15 2011
@@ -48,11 +48,16 @@ public class InstrumentationProcessor ex
return "Instrumentation" + (type != null ? ":" + type : "") + "[" + processor + "]";
}
- public void setCounter(ManagedPerformanceCounter counter) {
+ public void setCounter(Object counter) {
+ ManagedPerformanceCounter mpc = null;
+ if (counter instanceof ManagedPerformanceCounter) {
+ mpc = (ManagedPerformanceCounter) counter;
+ }
+
if (this.counter instanceof DelegatePerformanceCounter) {
- ((DelegatePerformanceCounter) this.counter).setCounter(counter);
+ ((DelegatePerformanceCounter) this.counter).setCounter(mpc);
} else {
- this.counter = counter;
+ this.counter = mpc;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java Sun Jun 26 14:50:15 2011
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import javax.management.ObjectName;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
@@ -253,9 +254,21 @@ public class ManagedCamelContext {
// endpoint already exists
return false;
}
- // create new endpoint by just getting it
- context.getEndpoint(uri);
- return true;
+
+ Endpoint endpoint = context.getEndpoint(uri);
+ if (endpoint != null) {
+ // ensure endpoint is registered, as the management strategy could have been configured to not always
+ // register new endpoints in JMX, so we need to check if its registered, and if not register it manually
+ ObjectName on = context.getManagementStrategy().getManagementNamingStrategy().getObjectNameForEndpoint(endpoint);
+ if (on != null && !context.getManagementStrategy().getManagementAgent().isRegistered(on)) {
+ // register endpoint as mbean
+ Object me = context.getManagementStrategy().getManagementObjectStrategy().getManagedObjectForEndpoint(context, endpoint);
+ context.getManagementStrategy().getManagementAgent().register(me, on);
+ }
+ return true;
+ } else {
+ return false;
+ }
}
/**
@@ -268,6 +281,7 @@ public class ManagedCamelContext {
*/
@ManagedOperation(description = "Removes endpoints by the given pattern")
public int removeEndpoints(String pattern) throws Exception {
+ // endpoints is always removed from JMX if removed from context
Collection<Endpoint> removed = context.removeEndpoints(pattern);
if (removed == null) {
return 0;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementAgent.java Sun Jun 26 14:50:15 2011
@@ -230,6 +230,8 @@ public interface ManagementAgent extends
/**
* Whether to register mbeans when starting a new route
+ * <p/>
+ * This option is default <tt>true</tt>.
*
* @return <tt>true</tt> to register when starting a new route
*/
@@ -237,6 +239,8 @@ public interface ManagementAgent extends
/**
* Whether to register mbeans when starting a new route
+ * <p/>
+ * This option is default <tt>true</tt>.
*
* @param registerNewRoutes <tt>true</tt> to register when starting a new route
*/
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java?rev=1139800&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java Sun Jun 26 14:50:15 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.model.ProcessorDefinition;
+
+/**
+ * Strategy for creating the managed object for the various beans Camel register for management.
+ */
+public interface ManagementObjectStrategy {
+
+ Object getManagedObjectForCamelContext(CamelContext context);
+
+ Object getManagedObjectForComponent(CamelContext context, Component component, String name);
+
+ Object getManagedObjectForEndpoint(CamelContext context, Endpoint endpoint);
+
+ Object getManagedObjectForErrorHandler(CamelContext context, RouteContext routeContext,
+ Processor errorHandler, ErrorHandlerBuilder errorHandlerBuilder);
+
+ Object getManagedObjectForRoute(CamelContext context, Route route);
+
+ Object getManagedObjectForConsumer(CamelContext context, Consumer consumer);
+
+ Object getManagedObjectForProducer(CamelContext context, Producer producer);
+
+ Object getManagedObjectForProcessor(CamelContext context, Processor processor,
+ ProcessorDefinition definition, Route route);
+
+ Object getManagedObjectForService(CamelContext context, Service service);
+
+ Object getManagedObjectForThreadPool(CamelContext context, ThreadPoolExecutor threadPool,
+ String id, String sourceId, String routeId, String threadPoolProfileId);
+
+ Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier);
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java Sun Jun 26 14:50:15 2011
@@ -96,6 +96,20 @@ public interface ManagementStrategy exte
void setManagementNamingStrategy(ManagementNamingStrategy strategy);
/**
+ * Gets the object strategy to use
+ *
+ * @return object strategy
+ */
+ ManagementObjectStrategy getManagementObjectStrategy();
+
+ /**
+ * Sets the object strategy to use
+ *
+ * @param strategy object strategy
+ */
+ void setManagementObjectStrategy(ManagementObjectStrategy strategy);
+
+ /**
* Gets the management agent
*
* @return management agent
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java?rev=1139800&r1=1139799&r2=1139800&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java Sun Jun 26 14:50:15 2011
@@ -95,9 +95,16 @@ public class ManagedCamelContextTest ext
assertNotNull(context.hasEndpoint("seda:bar"));
+ ObjectName seda = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"seda://bar\"");
+ boolean registered = mbeanServer.isRegistered(seda);
+ assertTrue("Should be registered " + seda, registered);
+
// create it again
reply = mbeanServer.invoke(on, "createEndpoint", new Object[]{"seda:bar"}, new String[]{"java.lang.String"});
assertEquals(Boolean.FALSE, reply);
+
+ registered = mbeanServer.isRegistered(seda);
+ assertTrue("Should be registered " + seda, registered);
}
public void testManagedCamelContextRemoveEndpoint() throws Exception {
@@ -113,17 +120,25 @@ public class ManagedCamelContextTest ext
assertNotNull(context.hasEndpoint("seda:bar"));
+ ObjectName seda = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"seda://bar\"");
+ boolean registered = mbeanServer.isRegistered(seda);
+ assertTrue("Should be registered " + seda, registered);
+
// remove it
Object num = mbeanServer.invoke(on, "removeEndpoints", new Object[]{"seda:*"}, new String[]{"java.lang.String"});
assertEquals(1, num);
assertNull(context.hasEndpoint("seda:bar"));
+ registered = mbeanServer.isRegistered(seda);
+ assertFalse("Should not be registered " + seda, registered);
// remove it again
num = mbeanServer.invoke(on, "removeEndpoints", new Object[]{"seda:*"}, new String[]{"java.lang.String"});
assertEquals(0, num);
assertNull(context.hasEndpoint("seda:bar"));
+ registered = mbeanServer.isRegistered(seda);
+ assertFalse("Should not be registered " + seda, registered);
}
@Override