You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/26 13:36:21 UTC
[8/8] camel git commit: CAMEL-9014: Option to turn on extended JMX
statistics for EIPs to track fine grained utilization statistics such as
which and how often they send to dynamic endpoints.
CAMEL-9014: Option to turn on extended JMX statistics for EIPs to track fine grained utilization statistics such as which and how often they send to dynamic endpoints.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e55b0e8c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e55b0e8c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e55b0e8c
Branch: refs/heads/master
Commit: e55b0e8ce07950b44cfe067470f351a6cbe0d95f
Parents: 88ab0c4
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jul 26 13:43:08 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jul 26 13:43:08 2015 +0200
----------------------------------------------------------------------
.../mbean/ManagedPollEnricherMBean.java | 8 ++-
.../org/apache/camel/impl/ConsumerCache.java | 66 ++++++++++++++++++--
.../org/apache/camel/impl/ProducerCache.java | 7 +--
.../management/mbean/ManagedPollEnricher.java | 51 ++++++++++++++-
.../apache/camel/processor/PollEnricher.java | 5 ++
.../management/ManagedPollEnricherTest.java | 15 ++++-
6 files changed, 141 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java
index 623b85c..da11262 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java
@@ -16,9 +16,12 @@
*/
package org.apache.camel.api.management.mbean;
+import javax.management.openmbean.TabularData;
+
import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
-public interface ManagedPollEnricherMBean extends ManagedProcessorMBean {
+public interface ManagedPollEnricherMBean extends ManagedProcessorMBean, ManagedExtendedInformation {
@ManagedAttribute(description = "The language for the expression")
String getExpressionLanguage();
@@ -38,4 +41,7 @@ public interface ManagedPollEnricherMBean extends ManagedProcessorMBean {
@ManagedAttribute(description = "Whether to aggregate when there was an exception thrown during calling the resource endpoint")
Boolean isAggregateOnException();
+ @ManagedOperation(description = "Statistics of the endpoints that has been poll enriched from")
+ TabularData extendedInformation();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
index d957efe..399012c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
@@ -26,6 +26,7 @@ import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ServicePoolAware;
+import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ServicePool;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.CamelContextHelper;
@@ -41,11 +42,16 @@ import org.slf4j.LoggerFactory;
*/
public class ConsumerCache extends ServiceSupport {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerCache.class);
+
private final CamelContext camelContext;
private final ServicePool<Endpoint, PollingConsumer> pool;
private final Map<String, PollingConsumer> consumers;
private final Object source;
+ private EndpointUtilizationStatistics statistics;
+ private boolean extendedStatistics;
+ private int maxCacheSize;
+
public ConsumerCache(Object source, CamelContext camelContext) {
this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
}
@@ -63,6 +69,27 @@ public class ConsumerCache extends ServiceSupport {
this.consumers = cache;
this.source = source;
this.pool = pool;
+ if (consumers instanceof LRUCache) {
+ maxCacheSize = ((LRUCache) consumers).getMaxCacheSize();
+ }
+
+ // only if JMX is enabled
+ if (camelContext.getManagementStrategy().getManagementAgent() != null) {
+ this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended();
+ } else {
+ this.extendedStatistics = false;
+ }
+ }
+
+ public boolean isExtendedStatistics() {
+ return extendedStatistics;
+ }
+
+ /**
+ * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics}
+ */
+ public void setExtendedStatistics(boolean extendedStatistics) {
+ this.extendedStatistics = extendedStatistics;
}
/**
@@ -156,8 +183,15 @@ public class ConsumerCache extends ServiceSupport {
}
}
}
+
+ if (answer != null) {
+ // record statistics
+ if (extendedStatistics) {
+ statistics.onHit(key);
+ }
+ }
+
return answer;
-
}
public Exchange receive(Endpoint endpoint) {
@@ -295,6 +329,9 @@ public class ConsumerCache extends ServiceSupport {
LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
cache.resetStatistics();
}
+ if (statistics != null) {
+ statistics.clear();
+ }
}
/**
@@ -302,6 +339,13 @@ public class ConsumerCache extends ServiceSupport {
*/
public synchronized void purge() {
consumers.clear();
+ if (statistics != null) {
+ statistics.clear();
+ }
+ }
+
+ public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
+ return statistics;
}
@Override
@@ -310,15 +354,29 @@ public class ConsumerCache extends ServiceSupport {
}
protected void doStart() throws Exception {
+ if (extendedStatistics) {
+ int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize;
+ statistics = new DefaultEndpointUtilizationStatistics(max);
+ }
+
ServiceHelper.startServices(consumers.values());
}
protected void doStop() throws Exception {
// when stopping we intend to shutdown
- ServiceHelper.stopAndShutdownServices(consumers.values());
+ ServiceHelper.stopAndShutdownServices(statistics, pool);
+ try {
+ ServiceHelper.stopAndShutdownServices(consumers.values());
+ } finally {
+ // ensure consumers are removed, and also from JMX
+ for (PollingConsumer consumer : consumers.values()) {
+ getCamelContext().removeService(consumer);
+ }
+ }
consumers.clear();
- // we need to stop the pool service here
- ServiceHelper.stopService(pool);
+ if (statistics != null) {
+ statistics.clear();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index 7b2cd37..5b79954 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -67,9 +67,6 @@ public class ProducerCache extends ServiceSupport {
public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize));
- if (producers instanceof LRUCache) {
- maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
- }
}
public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
@@ -81,6 +78,9 @@ public class ProducerCache extends ServiceSupport {
this.camelContext = camelContext;
this.pool = producerServicePool;
this.producers = cache;
+ if (producers instanceof LRUCache) {
+ maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
+ }
// only if JMX is enabled
if (camelContext.getManagementStrategy().getManagementAgent() != null) {
@@ -451,7 +451,6 @@ public class ProducerCache extends ServiceSupport {
if (extendedStatistics) {
statistics.onHit(key);
}
-
}
return answer;
http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java
index 0b71e8f..ae0b7f1 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java
@@ -16,12 +16,22 @@
*/
package org.apache.camel.management.mbean;
+import java.util.Map;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
import org.apache.camel.CamelContext;
import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
import org.apache.camel.api.management.mbean.ManagedPollEnricherMBean;
import org.apache.camel.model.PollEnrichDefinition;
import org.apache.camel.processor.PollEnricher;
+import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
/**
@@ -31,6 +41,7 @@ import org.apache.camel.util.URISupport;
public class ManagedPollEnricher extends ManagedProcessor implements ManagedPollEnricherMBean {
private final PollEnricher processor;
private String uri;
+ private boolean sanitize;
public ManagedPollEnricher(CamelContext context, PollEnricher processor, PollEnrichDefinition definition) {
super(context, processor, definition);
@@ -39,7 +50,7 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll
public void init(ManagementStrategy strategy) {
super.init(strategy);
- boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
+ sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
uri = getDefinition().getExpression().getExpression();
if (sanitize) {
uri = URISupport.sanitizeUri(uri);
@@ -47,6 +58,14 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll
}
@Override
+ public synchronized void reset() {
+ super.reset();
+ if (processor.getEndpointUtilizationStatistics() != null) {
+ processor.getEndpointUtilizationStatistics().clear();
+ }
+ }
+
+ @Override
public PollEnrichDefinition getDefinition() {
return (PollEnrichDefinition) super.getDefinition();
}
@@ -85,4 +104,34 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll
public Boolean isAggregateOnException() {
return processor.isAggregateOnException();
}
+
+ @Override
+ public TabularData extendedInformation() {
+ try {
+ TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType());
+
+ EndpointUtilizationStatistics stats = processor.getEndpointUtilizationStatistics();
+ if (stats != null) {
+ for (Map.Entry<String, Long> entry : stats.getStatistics().entrySet()) {
+ CompositeType ct = CamelOpenMBeanTypes.endpointsUtilizationCompositeType();
+ String url = entry.getKey();
+ if (sanitize) {
+ url = URISupport.sanitizeUri(url);
+ }
+
+ Long hits = entry.getValue();
+ if (hits == null) {
+ hits = 0L;
+ }
+
+ CompositeData data = new CompositeDataSupport(ct, new String[]{"url", "hits"}, new Object[]{url, hits});
+ answer.put(data);
+ }
+ }
+ return answer;
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 998926b..691e55d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -28,6 +28,7 @@ import org.apache.camel.PollingConsumer;
import org.apache.camel.impl.ConsumerCache;
import org.apache.camel.impl.EmptyConsumerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -94,6 +95,10 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
return expression;
}
+ public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
+ return consumerCache.getEndpointUtilizationStatistics();
+ }
+
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java
index 657a374..790c512 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java
@@ -20,6 +20,8 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.TabularData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ManagementStatisticsLevel;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -29,6 +31,13 @@ import org.apache.camel.component.mock.MockEndpoint;
*/
public class ManagedPollEnricherTest extends ManagementTestSupport {
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.getManagementStrategy().getManagementAgent().setStatisticsLevel(ManagementStatisticsLevel.Extended);
+ return context;
+ }
+
public void testManagePollEnricher() throws Exception {
// JMX tests dont work well on AIX CI servers (hangs them)
if (isPlatform("aix")) {
@@ -68,7 +77,11 @@ public class ManagedPollEnricherTest extends ManagementTestSupport {
String uri = (String) mbeanServer.getAttribute(on, "Expression");
assertEquals("seda:${header.whereto}", uri);
- TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
+ TabularData data = (TabularData) mbeanServer.invoke(on, "extendedInformation", null, null);
+ assertNotNull(data);
+ assertEquals(1, data.size());
+
+ data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
assertNotNull(data);
assertEquals(3, data.size());