You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/05/17 09:05:30 UTC
svn commit: r657300 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/
main/java/org/apache/camel/model/ main/java/org/apache/camel/spi/
test/java/org/apache/camel/management/
Author: ningjiang
Date: Sat May 17 00:05:30 2008
New Revision: 657300
URL: http://svn.apache.org/viewvc?rev=657300&view=rev
Log:
CAMEL-524 applied patch with thanks to William
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/CamelNamingStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationAgentImpl.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InstrumentationAgent.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java?rev=657300&r1=657299&r2=657300&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java Sat May 17 00:05:30 2008
@@ -35,6 +35,7 @@
import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.ProceedProcessor;
import org.apache.camel.processor.UnitOfWorkProcessor;
+import org.apache.camel.spi.InterceptStrategy;
/**
* The context used to activate new routing rules
@@ -49,6 +50,7 @@
private List<Processor> eventDrivenProcessors = new ArrayList<Processor>();
private Interceptor lastInterceptor;
private CamelContext camelContext;
+ private InterceptStrategy interceptStrategy;
public RouteContext(RouteType route, FromType from, Collection<Route> routes) {
this.route = route;
@@ -177,4 +179,22 @@
return new ProceedProcessor(lastInterceptor);
}
}
+
+ /**
+ * This method retrieves the InterceptStrategy on this route context.
+ *
+ * @return InterceptStrategy
+ */
+ public InterceptStrategy getInterceptStrategy() {
+ return interceptStrategy;
+ }
+
+ /**
+ * This method sets the InterceptStrategy on this route context.
+ *
+ * @param strategy
+ */
+ public void setInterceptStrategy(InterceptStrategy strategy) {
+ interceptStrategy = strategy;
+ }
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/CamelNamingStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/CamelNamingStrategy.java?rev=657300&r1=657299&r2=657300&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/CamelNamingStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/CamelNamingStrategy.java Sat May 17 00:05:30 2008
@@ -27,25 +27,23 @@
import org.apache.camel.Exchange;
import org.apache.camel.Route;
import org.apache.camel.impl.RouteContext;
+import org.apache.camel.model.ProcessorType;
import org.apache.camel.model.RouteType;
public class CamelNamingStrategy {
public static final String VALUE_UNKNOWN = "unknown";
- public static final String VALUE_ROUTE = "route";
- public static final String VALUE_STATS = "Stats";
- public static final String VALUE_DEFAULT_BUILDER = "default";
public static final String KEY_NAME = "name";
public static final String KEY_TYPE = "type";
public static final String KEY_CONTEXT = "context";
public static final String KEY_GROUP = "group";
public static final String KEY_COMPONENT = "component";
- public static final String KEY_BUILDER = "builder";
- public static final String KEY_ROUTE_TYPE = "routeType";
public static final String KEY_ROUTE = "route";
- public static final String GROUP_ENDPOINTS = "endpoints";
- public static final String GROUP_SERVICES = "services";
- public static final String GROUP_ROUTES = "routes";
- public static final String GROUP_ROUTE_TYPE = "routeType";
+ public static final String TYPE_CONTEXT = "context";
+ public static final String TYPE_ENDPOINT = "endpoint";
+ public static final String TYPE_PROCESSOR = "processor";
+ public static final String TYPE_ROUTE = "route";
+ public static final String TYPE_SERVICE = "service";
+
protected String domainName;
protected String hostName = "locahost";
@@ -68,7 +66,7 @@
/**
* Implements the naming strategy for a {@link CamelContext}.
* The convention used for a {@link CamelContext} ObjectName is:
- * <tt><domain>:context=<context>,name=camel</tt>
+ * <tt><domain>:context=<context-name>,type=context,name=<context-name></tt>
*
* @param context the camel context
* @return generated ObjectName
@@ -78,14 +76,15 @@
StringBuffer buffer = new StringBuffer();
buffer.append(domainName + ":");
buffer.append(KEY_CONTEXT + "=" + getContextId(context) + ",");
- buffer.append(KEY_NAME + "=" + "context");
+ buffer.append(KEY_TYPE + "=" + TYPE_CONTEXT + ",");
+ buffer.append(KEY_NAME + "=" + getContextId(context));
return createObjectName(buffer);
}
/**
* Implements the naming strategy for a {@link ManagedEndpoint}.
* The convention used for a {@link ManagedEndpoint} ObjectName is:
- * <tt><domain>:context=<context>,type=Services,endpoint=[urlPrefix]localPart</tt>
+ * <tt><domain>:context=<context-name>,type=endpoint,component=<component-name>name=<endpoint-name></tt>
*
* @param mbean
* @return generated ObjectName
@@ -97,7 +96,7 @@
StringBuffer buffer = new StringBuffer();
buffer.append(domainName + ":");
buffer.append(KEY_CONTEXT + "=" + getContextId(ep.getCamelContext()) + ",");
- buffer.append(KEY_GROUP + "=" + GROUP_ENDPOINTS + ",");
+ buffer.append(KEY_TYPE + "=" + TYPE_ENDPOINT + ",");
buffer.append(KEY_COMPONENT + "=" + getComponentId(ep) + ",");
buffer.append(KEY_NAME + "=" + getEndpointId(ep));
return createObjectName(buffer);
@@ -106,7 +105,7 @@
/**
* Implements the naming strategy for a {@link org.apache.camel.impl.ServiceSupport Service}.
* The convention used for a {@link org.apache.camel.Service Service} ObjectName is
- * <tt><domain>:context=<context>,type=Services,endpoint=[urlPrefix]localPart</tt>
+ * <tt><domain>:context=<context-name>,type=service,name=<service-name></tt>
*
* @param context the camel context
* @param mbean
@@ -117,7 +116,7 @@
StringBuffer buffer = new StringBuffer();
buffer.append(domainName + ":");
buffer.append(KEY_CONTEXT + "=" + getContextId(context) + ",");
- buffer.append(KEY_GROUP + "=" + GROUP_SERVICES + ",");
+ buffer.append(KEY_TYPE + "=" + TYPE_SERVICE + ",");
buffer.append(KEY_NAME + "=" + Integer.toHexString(mbean.getService().hashCode()));
return createObjectName(buffer);
}
@@ -125,10 +124,10 @@
/**
* Implements the naming strategy for a {@link ManagedRoute}.
- * The convention used for a {@link ManagedEndpoint} ObjectName is:
- * <tt><domain>:context=<context>,type=Routes,endpoint=[urlPrefix]localPart</tt>
+ * The convention used for a {@link ManagedRoute} ObjectName is:
+ * <tt><domain>:context=<context-name>,route=<route-name>,type=route,name=<route-name></tt>
*
- * @param mbean
+ * @param mbean
* @return generated ObjectName
* @throws MalformedObjectNameException
*/
@@ -140,48 +139,40 @@
String cid = getComponentId(ep);
String id = VALUE_UNKNOWN.equals(cid) ? getEndpointId(ep)
: "[" + cid + "]" + getEndpointId(ep);
- String group = (String)route.getProperties().get(Route.GROUP_PROPERTY);
StringBuffer buffer = new StringBuffer();
buffer.append(domainName + ":");
buffer.append(KEY_CONTEXT + "=" + ctxid + ",");
- buffer.append(KEY_GROUP + "=" + GROUP_ROUTES + ",");
- buffer.append(KEY_BUILDER + "=" + (group != null ? group : VALUE_DEFAULT_BUILDER) + ",");
- buffer.append(KEY_ROUTE_TYPE + "=" + route.getProperties().get(Route.PARENT_PROPERTY) + ",");
buffer.append(KEY_ROUTE + "=" + id + ",");
- buffer.append(KEY_TYPE + "=" + VALUE_ROUTE);
+ buffer.append(KEY_TYPE + "=" + TYPE_ROUTE + ",");
+ buffer.append(KEY_NAME + "=" + id);
return createObjectName(buffer);
}
/**
- * Implements the naming strategy for a {@link PerformanceCounter}.
- * The convention used for a {@link ManagedEndpoint} ObjectName is:
- * <tt><domain>:context=<context>,type=Routes,endpoint=[urlPrefix]localPart</tt>
- *
- * @param context the camel context
- * @param mbean
+ * Implements the naming strategy for a {@link ProcessorType}.
+ * The convention used for a {@link ProcessorType} ObjectName is:
+ * <tt><domain>:context=<context-name>,route=<route-name>,type=processor,name=<processor-name></tt>
+ *
* @param routeContext
- * @return generated ObjectName
+ * @param processor
+ * @return
* @throws MalformedObjectNameException
*/
- public ObjectName getObjectName(CamelContext context, PerformanceCounter mbean, RouteContext routeContext)
- throws MalformedObjectNameException {
-
+ public ObjectName getObjectName(RouteContext routeContext,
+ ProcessorType processor) throws MalformedObjectNameException {
RouteType route = routeContext.getRoute();
Endpoint<? extends Exchange> ep = routeContext.getEndpoint();
String ctxid = ep != null ? getContextId(ep.getCamelContext()) : VALUE_UNKNOWN;
String cid = getComponentId(ep);
String id = VALUE_UNKNOWN.equals(cid) ? getEndpointId(ep) : "[" + cid + "]" + getEndpointId(ep);
- String group = route.getGroup();
StringBuffer buffer = new StringBuffer();
buffer.append(domainName + ":");
buffer.append(KEY_CONTEXT + "=" + ctxid + ",");
- buffer.append(KEY_GROUP + "=" + GROUP_ROUTES + ",");
- buffer.append(KEY_BUILDER + "=" + (group != null ? group : VALUE_DEFAULT_BUILDER) + ",");
- buffer.append(KEY_ROUTE_TYPE + "=" + route.hashCode() + ",");
buffer.append(KEY_ROUTE + "=" + id + ",");
- buffer.append(KEY_TYPE + "=" + VALUE_STATS);
+ buffer.append(KEY_TYPE + "=" + TYPE_PROCESSOR + ",");
+ buffer.append(KEY_NAME + "=" + ObjectName.quote(processor.toString()));
return createObjectName(buffer);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationAgentImpl.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationAgentImpl.java?rev=657300&r1=657299&r2=657300&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationAgentImpl.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationAgentImpl.java Sat May 17 00:05:30 2008
@@ -71,16 +71,12 @@
private String jmxDomainName;
private int jmxConnectorPort;
private String jmxConnectorPath;
- private CamelNamingStrategy namingStrategy;
private boolean createConnector = true;
private boolean usePlatformMBeanServer;
public InstrumentationAgentImpl() {
assembler = new MetadataMBeanInfoAssembler();
assembler.setAttributeSource(new AnnotationJmxAttributeSource());
- // naming = new
- // CamelNamingStrategy(agent.getMBeanServer().getDefaultDomain());
- namingStrategy = new CamelNamingStrategy();
}
public CamelContext getCamelContext() {
@@ -140,14 +136,6 @@
server.unregisterMBean(name);
}
- public CamelNamingStrategy getNamingStrategy() {
- return namingStrategy;
- }
-
- public void setNamingStrategy(CamelNamingStrategy namingStrategy) {
- this.namingStrategy = namingStrategy;
- }
-
protected void doStart() throws Exception {
ObjectHelper.notNull(context, "camelContext");
@@ -162,13 +150,13 @@
jmxDomainName = DEFAULT_DOMAIN;
}
}
- configureDomainName();
LOG.debug("Starting JMX agent on server: " + getMBeanServer());
if (context instanceof DefaultCamelContext) {
DefaultCamelContext dc = (DefaultCamelContext)context;
- InstrumentationLifecycleStrategy ls = new InstrumentationLifecycleStrategy(this);
+ InstrumentationLifecycleStrategy ls = new InstrumentationLifecycleStrategy(
+ this, new CamelNamingStrategy(jmxDomainName));
dc.setLifecycleStrategy(ls);
ls.onContextCreate(context);
}
@@ -232,17 +220,10 @@
public void enableJmx(String domainName, String connectorPath, int port) {
jmxEnabled = true;
jmxDomainName = domainName;
- configureDomainName();
jmxConnectorPath = connectorPath;
jmxConnectorPort = port;
}
- protected void configureDomainName() {
- if (jmxDomainName != null) {
- namingStrategy.setDomainName(jmxDomainName);
- }
- }
-
protected void createMBeanServer() {
String hostName = DEFAULT_HOST;
boolean canAccessSystemProps = true;
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java?rev=657300&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java Sat May 17 00:05:30 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Map;
+
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.spi.InterceptStrategy;
+
+/**
+ * This strategy class wraps targeted processors with
+ * {@link InstrumentationProcessor}. Each InstrumentationProcessor is
+ * into embeds a {@link PerformanceCounter}. This class looks up a map
+ * to determine which PerformanceCounter should go into the
+ * InstrumentationProcessor for any particular target processor.
+ *
+ * @version $Revision$
+ */
+public class InstrumentationInterceptStrategy implements InterceptStrategy {
+
+ private Map<ProcessorType, PerformanceCounter> counterMap;
+
+ public InstrumentationInterceptStrategy(Map<ProcessorType, PerformanceCounter> counterMap) {
+ this.counterMap = counterMap;
+ }
+
+ public Processor wrapProcessorInInterceptors(ProcessorType processorType,
+ Processor target) throws Exception {
+
+ Processor retval = target;
+ PerformanceCounter counter = counterMap.get(processorType);
+
+ if (counter != null) {
+ InstrumentationProcessor wrapper = new InstrumentationProcessor(counter);
+ wrapper.setProcessor(target);
+ retval = wrapper;
+ }
+
+ return retval;
+ }
+
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java?rev=657300&r1=657299&r2=657300&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java Sat May 17 00:05:30 2008
@@ -17,6 +17,8 @@
package org.apache.camel.management;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import javax.management.JMException;
@@ -28,6 +30,8 @@
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.RouteContext;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.model.RouteType;
import org.apache.camel.spi.InstrumentationAgent;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.commons.logging.Log;
@@ -39,9 +43,15 @@
private InstrumentationAgent agent;
private CamelNamingStrategy namingStrategy;
- public InstrumentationLifecycleStrategy(InstrumentationAgent agent) {
+ // A map (Endpoint -> InstrumentationProcessor) to facilitate
+ // adding per-route interceptor and registering ManagedRoute MBean
+ private Map<Endpoint, InstrumentationProcessor> interceptorMap =
+ new HashMap<Endpoint, InstrumentationProcessor>();
+
+ public InstrumentationLifecycleStrategy(InstrumentationAgent agent,
+ CamelNamingStrategy namingStrategy) {
this.agent = agent;
- setNamingStrategy(agent.getNamingStrategy());
+ this.namingStrategy = namingStrategy;
}
public void onContextCreate(CamelContext context) {
@@ -69,6 +79,14 @@
for (Route route : routes) {
try {
ManagedRoute mr = new ManagedRoute(route);
+ // retrieve the per-route intercept for this route
+ InstrumentationProcessor interceptor = interceptorMap.get(route.getEndpoint());
+ if (interceptor == null) {
+ LOG.warn("Instrumentation processor not found for route endpoint " +
+ route.getEndpoint());
+ } else {
+ interceptor.setCounter(mr);
+ }
agent.register(mr, getNamingStrategy().getObjectName(mr));
} catch (JMException e) {
LOG.warn("Could not register Route MBean", e);
@@ -88,18 +106,53 @@
}
public void onRouteContextCreate(RouteContext routeContext) {
- PerformanceCounter mc = new PerformanceCounter();
- routeContext.getRoute().intercept(new InstrumentationProcessor(mc));
- /*
- * Merge performance counter with the MBean it represents instead of
- * registering a new MBean
- */
- try {
- agent.register(mc, getNamingStrategy().getObjectName(routeContext.getCamelContext(), mc,
- routeContext));
- } catch (JMException e) {
- LOG.warn("Could not register Counter MBean", e);
+ // Create a map (ProcessorType -> PerformanceCounter)
+ // to be passed to InstrumentationInterceptStrategy.
+ Map<ProcessorType, PerformanceCounter> counterMap =
+ new HashMap<ProcessorType, PerformanceCounter>();
+
+ // Each processor in a route will have its own performance counter
+ // The performance counter are MBeans that we register with MBeanServer.
+ // These performance counter will be embedded
+ // to InstrumentationProcessor and wrap the appropriate processor
+ // by InstrumentationInterceptStrategy.
+ RouteType route = routeContext.getRoute();
+ for (ProcessorType processor : route.getOutputs()) {
+ PerformanceCounter pc = new PerformanceCounter();
+ try {
+ agent.register(pc, getNamingStrategy().getObjectName(
+ routeContext, processor));
+ } catch (JMException e) {
+ LOG.warn("Could not register Counter MBean", e);
+ }
+ counterMap.put(processor, pc);
+ }
+
+ routeContext.setInterceptStrategy(
+ new InstrumentationInterceptStrategy(counterMap));
+
+ // Add an InstrumentationProcessor at the beginning of each route and
+ // set up the interceptorMap for onRoutesAdd() method to register the
+ // ManagedRoute MBeans.
+ RouteType routeType = routeContext.getRoute();
+ if (routeType.getInputs() != null && !routeType.getInputs().isEmpty()) {
+ if (routeType.getInputs().size() > 1) {
+ LOG.warn("Add InstrumentationProcessor to first input only.");
+ }
+
+ Endpoint endpoint = routeType.getInputs().get(0).getEndpoint();
+ ProcessorType<?>[] outputs =
+ routeType.getOutputs().toArray(new ProcessorType<?>[0]);
+
+ routeType.clearOutput();
+ InstrumentationProcessor processor = new InstrumentationProcessor();
+ routeType.intercept(processor);
+ for (ProcessorType<?> output : outputs) {
+ routeType.addOutput(output);
+ }
+
+ interceptorMap.put(endpoint, processor);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=657300&r1=657299&r2=657300&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java Sat May 17 00:05:30 2008
@@ -27,6 +27,13 @@
this.counter = counter;
}
+ InstrumentationProcessor() {
+ }
+
+ public void setCounter(PerformanceCounter counter) {
+ this.counter = counter;
+ }
+
public void process(Exchange exchange) throws Exception {
long startTime = System.nanoTime();
super.process(exchange);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=657300&r1=657299&r2=657300&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Sat May 17 00:05:30 2008
@@ -47,16 +47,13 @@
import org.apache.camel.model.language.LanguageExpression;
import org.apache.camel.processor.ConvertBodyProcessor;
import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.processor.Pipeline;
-import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.aggregate.AggregationCollection;
import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.idempotent.IdempotentConsumer;
import org.apache.camel.processor.idempotent.MessageIdRepository;
import org.apache.camel.spi.DataFormat;
+import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.Policy;
-import org.apache.camel.spi.Registry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -1434,35 +1431,13 @@
throw new RuntimeCamelException("target provided.");
}
- // Interceptors are optional
- DelegateProcessor first = null;
- DelegateProcessor last = null;
-/*
-
- List<InterceptorType> interceptors = new ArrayList<InterceptorType>(routeContext.getRoute()
- .getInterceptors());
- List<InterceptorType> list = getInterceptors();
- for (InterceptorType interceptorType : list) {
- if (!interceptors.contains(interceptorType)) {
- interceptors.add(interceptorType);
- }
- }
- for (InterceptorType interceptorRef : interceptors) {
- DelegateProcessor p = interceptorRef.createInterceptor(routeContext);
- if (first == null) {
- first = p;
- }
- if (last != null) {
- last.setProcessor(p);
- }
- last = p;
+ InterceptStrategy strategy = routeContext.getInterceptStrategy();
+ if (strategy != null) {
+ return strategy.wrapProcessorInInterceptors(this, target);
+ } else {
+ return target;
}
- if (last != null) {
- last.setProcessor(target);
- }
-*/
- return first == null ? target : first;
}
/**
@@ -1511,6 +1486,7 @@
List<Processor> list = new ArrayList<Processor>();
for (ProcessorType output : outputs) {
Processor processor = output.createProcessor(routeContext);
+ processor = output.wrapProcessorInInterceptors(routeContext, processor);
list.add(processor);
}
Processor processor = null;
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InstrumentationAgent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InstrumentationAgent.java?rev=657300&r1=657299&r2=657300&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InstrumentationAgent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InstrumentationAgent.java Sat May 17 00:05:30 2008
@@ -21,7 +21,6 @@
import javax.management.ObjectName;
import org.apache.camel.Service;
-import org.apache.camel.management.CamelNamingStrategy;
public interface InstrumentationAgent extends Service {
@@ -59,5 +58,4 @@
*/
MBeanServer getMBeanServer();
- CamelNamingStrategy getNamingStrategy();
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java?rev=657300&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java Sat May 17 00:05:30 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorType;
+
+ /**
+ * The purpose of this interface is to allow an implementation to wrap
+ * processors in a route with interceptors. For example, a possible
+ * usecase is to gather performance statistics at the processor's level.
+ *
+ * @version $Revision$
+ */
+public interface InterceptStrategy {
+
+ /**
+ * This method is invoked by
+ * {@link ProcessorType#wrapProcessor(RouteContext, Processor)
+ * to give the implementor an opportunity to wrap the target processor
+ * in a route.
+ *
+ * @param processorType the object that invokes this method
+ * @param target the processor to be wrapped
+ * @return processor wrapped with an interceptor or not wrapped
+ * @throws Exception
+ */
+ Processor wrapProcessorInInterceptors(ProcessorType processorType,
+ Processor target) throws Exception;
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java?rev=657300&r1=657299&r2=657300&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java Sat May 17 00:05:30 2008
@@ -41,13 +41,26 @@
resolveMandatoryEndpoint("mock:end", MockEndpoint.class);
- ObjectName name = new ObjectName(domainName + ":group=endpoints,*");
- Set s = iAgent.getMBeanServer().queryNames(name, null);
-
+ Set s = iAgent.getMBeanServer().queryNames(
+ new ObjectName(domainName + ":type=endpoint,*"), null);
+ assertEquals("Could not find 2 endpoints: " + s, 2, s.size());
+
+ s = iAgent.getMBeanServer().queryNames(
+ new ObjectName(domainName + ":type=context,*"), null);
+ assertEquals("Could not find 1 context: " + s, 1, s.size());
+
+ s = iAgent.getMBeanServer().queryNames(
+ new ObjectName(domainName + ":type=processor,*"), null);
+ assertEquals("Could not find 1 processor: " + s, 1, s.size());
+
+ s = iAgent.getMBeanServer().queryNames(
+ new ObjectName(domainName + ":type=route,*"), null);
+ assertEquals("Could not find 1 route: " + s, 1, s.size());
+
if (sleepSoYouCanBrowseInJConsole) {
Thread.sleep(100000);
}
- assertEquals("Could not find 2 endpoints: " + s, 2, s.size());
+
}
public void testCounters() throws Exception {
@@ -58,7 +71,12 @@
resultEndpoint.assertIsSatisfied();
MBeanServer mbs = iAgent.getMBeanServer();
- ObjectName name = new ObjectName(domainName + ":type=Stats,*");
+ verifyCounter(mbs, new ObjectName(domainName + ":type=route,*"));
+ verifyCounter(mbs, new ObjectName(domainName + ":type=processor,*"));
+
+ }
+
+ private void verifyCounter(MBeanServer mbs, ObjectName name) throws Exception {
Set s = mbs.queryNames(name, null);
assertEquals("Found mbeans: " + s, 1, s.size());
@@ -96,6 +114,7 @@
valueofMeanProcessingTime);
assertTrue(valueofMeanProcessingTime >= valueofMinProcessingTime
&& valueofMeanProcessingTime <= valueofMaxProcessingTime);
+
}
protected void enableJmx() {