You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/04/24 13:04:49 UTC

svn commit: r768258 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/management/ main/java/org/apache/camel/model/ test/java/org/apache/camel/management/

Author: davsclaus
Date: Fri Apr 24 11:04:48 2009
New Revision: 768258

URL: http://svn.apache.org/viewvc?rev=768258&view=rev
Log:
CAMEL-1562: Routes are now also JMX instrumented. But this time they do not alter the route at all. So the runtime route model is the same whether JMX is enabled or not. 

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java Fri Apr 24 11:04:48 2009
@@ -51,6 +51,7 @@
         if (counter != null) {
             InstrumentationProcessor wrapper = new InstrumentationProcessor(counter);
             wrapper.setProcessor(target);
+            wrapper.setType(processorDefinition.getShortName());
             // remove to not double wrap it
             registeredCounters.remove(processorDefinition);
             return wrapper;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java Fri Apr 24 11:04:48 2009
@@ -27,6 +27,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -35,6 +36,7 @@
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.spi.InstrumentationAgent;
+import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
@@ -53,11 +55,7 @@
     private InstrumentationAgent agent;
     private CamelNamingStrategy namingStrategy;
     private boolean initialized;
-
-    // A map (Endpoint -> InstrumentationProcessor) to facilitate
-    // adding per-route interceptor and registering ManagedRoute MBean
-    private final Map<Endpoint, InstrumentationProcessor> interceptorMap =
-        new HashMap<Endpoint, InstrumentationProcessor>();
+    private final Map<Endpoint, InstrumentationProcessor> registeredRoutes = new HashMap<Endpoint, InstrumentationProcessor>();
 
     public InstrumentationLifecycleStrategy() {
         this(new DefaultInstrumentationAgent());
@@ -165,26 +163,22 @@
             return;
         }
 
-        // TODO: Disabled for now until we find a better strategy for registering routes in the JMX
-        // without altering the route model. The route model should be much the same as without JMX to avoid
-        // a gap that causes pain to get working with and without JMX enabled. We have seen to many issues with this already.
-/*
         for (Route route : routes) {
             try {
                 ManagedRoute mr = new ManagedRoute(route);
                 // retrieve the per-route intercept for this route
-                InstrumentationProcessor interceptor = interceptorMap.get(route.getEndpoint());
-                if (interceptor == null) {
-                    LOG.warn("Instrumentation processor not found for route endpoint: " + route.getEndpoint());
+                InstrumentationProcessor processor = registeredRoutes.get(route.getEndpoint());
+                if (processor == null) {
+                    LOG.warn("Route has not been instrumented for endpoint: " + route.getEndpoint());
                 } else {
-                    interceptor.setCounter(mr);
+                    // let the instrumentation use our route counter
+                    processor.setCounter(mr);
                 }
                 agent.register(mr, getNamingStrategy().getObjectName(mr));
             } catch (JMException e) {
                 LOG.warn("Could not register Route MBean", e);
             }
         }
-*/
     }
 
     public void onServiceAdd(CamelContext context, Service service) {
@@ -223,6 +217,11 @@
         // by InstrumentationInterceptStrategy.
         RouteDefinition route = routeContext.getRoute();
 
+        // TODO: This only registers counters for the first outputs in the route
+        // all the chidren of the outputs is not registered
+        // we should leverge the Channel for this to ensure we register all processors
+        // in the entire route graph
+
         // register all processors
         for (ProcessorDefinition processor : route.getOutputs()) {
             ObjectName name = null;
@@ -244,53 +243,33 @@
         }
 
         // add intercept strategy that executes the JMX instrumentation for performance metrics
+        // TODO: We could do as below with an inlined implementation instead of a separate class
         routeContext.addInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters));
 
-        // Add an InstrumentationProcessor at the beginning of each route and
-        // set up the interceptorMap for onRoutesAdd() method to register the
-        // ManagedRoute MBeans.
-
-        // TODO: Disabled for now until we find a better strategy for registering routes in the JMX
-        // without altering the route model. The route model should be much the same as without JMX to avoid
-        // a gap that causes pain to get working with and without JMX enabled. We have seen to many issues with this already.
-
-/*        RouteDefinition routeType = routeContext.getRoute();
-        if (routeType.getInputs() != null && !routeType.getInputs().isEmpty()) {
-            if (routeType.getInputs().size() > 1) {
-                LOG.warn("Addding InstrumentationProcessor to first input only.");
-            }
-
-            Endpoint endpoint  = routeType.getInputs().get(0).getEndpoint();
+        // instrument the route endpoint
+        final Endpoint endpoint = routeContext.getEndpoint();
 
-            List<ProcessorDefinition> exceptionHandlers = new ArrayList<ProcessorDefinition>();
-            List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
+        // only needed to register on the first output as all rotues will pass through this one
+        ProcessorDefinition out = routeContext.getRoute().getOutputs().get(0);
 
-            // separate out the exception handers in the outputs
-            for (ProcessorDefinition output : routeType.getOutputs()) {
-                if (output instanceof OnExceptionDefinition) {
-                    exceptionHandlers.add(output);
-                } else {
-                    outputs.add(output);
+        // add an intercept strategy that counts when the route sends to any of its outputs
+        out.addInterceptStrategy(new InterceptStrategy() {
+            public Processor wrapProcessorInInterceptors(ProcessorDefinition processorDefinition, Processor target) throws Exception {
+                if (registeredRoutes.containsKey(endpoint)) {
+                    // do not double wrap
+                    return target;
                 }
-            }
-
-            // clearing the outputs
-            routeType.clearOutput();
+                InstrumentationProcessor wrapper = new InstrumentationProcessor(null);
+                wrapper.setType(processorDefinition.getShortName());
+                wrapper.setProcessor(target);
 
-            // add exception handlers as top children
-            routeType.getOutputs().addAll(exceptionHandlers);
+                // register our wrapper
+                registeredRoutes.put(endpoint, wrapper);
 
-            // add an interceptor to instrument the route
-            InstrumentationProcessor processor = new InstrumentationProcessor();
-            routeType.intercept(processor);
-
-            // add the output
-            for (ProcessorDefinition processorType : outputs) {
-                routeType.addOutput(processorType);
+                return wrapper;
             }
+        });
 
-            interceptorMap.put(endpoint, processor);
-        }*/
     }
 
     public CamelNamingStrategy getNamingStrategy() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java Fri Apr 24 11:04:48 2009
@@ -34,6 +34,7 @@
 
     private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
     private PerformanceCounter counter;
+    private String type;
 
     public InstrumentationProcessor(PerformanceCounter counter) {
         this.counter = counter;
@@ -44,7 +45,7 @@
     
     @Override
     public String toString() {
-        return "Instrumentation[" + processor + "]";
+        return "Instrumention" + (type != null ? ":" + type : "") + "[" + processor + "]";
     }
 
     public void setCounter(PerformanceCounter counter) {
@@ -102,4 +103,11 @@
         }
     }
 
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java Fri Apr 24 11:04:48 2009
@@ -23,6 +23,7 @@
  * @version $Revision$
  */
 public class NodeFactory {
+
     public FilterDefinition createFilter() {
         return new FilterDefinition();
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Fri Apr 24 11:04:48 2009
@@ -53,6 +53,7 @@
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.TransactedPolicy;
@@ -70,9 +71,10 @@
     private static final transient Log LOG = LogFactory.getLog(ProcessorDefinition.class);
     private ErrorHandlerBuilder errorHandlerBuilder;
     private NodeFactory nodeFactory;
-    private LinkedList<Block> blocks = new LinkedList<Block>();
+    private final LinkedList<Block> blocks = new LinkedList<Block>();
     private ProcessorDefinition parent;
     private String errorHandlerRef;
+    private final List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
 
     // else to use an optional attribute in JAXB2
     public abstract List<ProcessorDefinition> getOutputs();
@@ -130,6 +132,7 @@
         // add interceptor strategies to the channel
         channel.addInterceptStrategies(routeContext.getCamelContext().getInterceptStrategies());
         channel.addInterceptStrategies(routeContext.getInterceptStrategies());
+        channel.addInterceptStrategies(this.getInterceptStrategies());
 
         // init the channel
         channel.initChannel(this, routeContext);
@@ -2027,6 +2030,15 @@
         this.nodeFactory = nodeFactory;
     }
 
+    @XmlTransient
+    public List<InterceptStrategy> getInterceptStrategies() {
+        return interceptStrategies;
+    }
+
+    public void addInterceptStrategy(InterceptStrategy strategy) {
+        this.interceptStrategies.add(strategy);
+    }
+
     /**
      * Returns a label to describe this node such as the expression if some kind of expression node
      */

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java Fri Apr 24 11:04:48 2009
@@ -68,8 +68,7 @@
 
         resultEndpoint.assertIsSatisfied();
 
-        // TODO: Routes are temporary disabled until the code in InstrumentationLifecycleStrategy is fixed
-        // verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+        verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
         verifyCounter(mbsc, new ObjectName(domainName + ":type=processors,*"));
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java Fri Apr 24 11:04:48 2009
@@ -42,7 +42,6 @@
         super.tearDown();
     }
 
-
     @Override
     public void testMBeansRegistered() throws Exception {
         if (System.getProperty(JmxSystemPropertyKeys.USE_PLATFORM_MBS) != null
@@ -52,22 +51,17 @@
 
         resolveMandatoryEndpoint("mock:end", MockEndpoint.class);
 
-        Set s = mbsc.queryNames(
-                new ObjectName(domainName + ":type=endpoints,*"), null);
+        Set s = mbsc.queryNames(new ObjectName(domainName + ":type=endpoints,*"), null);
         assertEquals("Could not find 0 endpoints: " + s, 0, s.size());
 
-        s = mbsc.queryNames(
-                new ObjectName(domainName + ":type=contexts,*"), null);
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=contexts,*"), null);
         assertEquals("Could not find 0 context: " + s, 0, s.size());
 
-        s = mbsc.queryNames(
-                new ObjectName(domainName + ":type=processors,*"), null);
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"), null);
         assertEquals("Could not find 0 processor: " + s, 0, s.size());
 
-        s = mbsc.queryNames(
-                new ObjectName(domainName + ":type=routes,*"), null);
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), null);
         assertEquals("Could not find 0 route: " + s, 0, s.size());
-
     }
 
     @Override

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java Fri Apr 24 11:04:48 2009
@@ -35,7 +35,6 @@
  * server to conduct the test as connector server is not enabled by default.
  *
  * @version $Revision$
- *
  */
 public class JmxInstrumentationUsingDefaultsTest extends ContextTestSupport {
 
@@ -60,10 +59,8 @@
         s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"), null);
         assertEquals("Could not find 1 processor: " + s, 1, s.size());
 
-        // TODO: Routes are temporary disalbed until we get the code in
-        // InstrumentationLifecycleStrategy fixed
-        //s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), null);
-        //assertEquals("Could not find 1 route: " + s, 1, s.size());
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), null);
+        assertEquals("Could not find 1 route: " + s, 1, s.size());
     }
 
     public void testCounters() throws Exception {
@@ -73,8 +70,7 @@
 
         resultEndpoint.assertIsSatisfied();
 
-        // TODO: See above
-        //verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+        verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
         verifyCounter(mbsc, new ObjectName(domainName + ":type=processors,*"));
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java Fri Apr 24 11:04:48 2009
@@ -60,9 +60,8 @@
         s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"), null);
         assertEquals("Could not find 2 processor: " + s, 2, s.size());
 
-        // TODO: Routes are temporary disabled until the code in InstrumentationLifecycleStrategy is fixed
-//        s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), null);
-//        assertEquals("Could not find 1 route: " + s, 1, s.size());
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), null);
+        assertEquals("Could not find 1 route: " + s, 1, s.size());
     }
 
     @Override
@@ -73,8 +72,7 @@
 
         resultEndpoint.assertIsSatisfied();
 
-        // TODO: see above
-        // verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+        verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
     }
 
 }