You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/26 13:36:21 UTC

[8/8] camel git commit: CAMEL-9014: Option to turn on extended JMX statistics for EIPs to track fine grained utilization statistics such as which and how often they send to dynamic endpoints.

CAMEL-9014: Option to turn on extended JMX statistics for EIPs to track fine grained utilization statistics such as which and how often they send to dynamic endpoints.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e55b0e8c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e55b0e8c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e55b0e8c

Branch: refs/heads/master
Commit: e55b0e8ce07950b44cfe067470f351a6cbe0d95f
Parents: 88ab0c4
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jul 26 13:43:08 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jul 26 13:43:08 2015 +0200

----------------------------------------------------------------------
 .../mbean/ManagedPollEnricherMBean.java         |  8 ++-
 .../org/apache/camel/impl/ConsumerCache.java    | 66 ++++++++++++++++++--
 .../org/apache/camel/impl/ProducerCache.java    |  7 +--
 .../management/mbean/ManagedPollEnricher.java   | 51 ++++++++++++++-
 .../apache/camel/processor/PollEnricher.java    |  5 ++
 .../management/ManagedPollEnricherTest.java     | 15 ++++-
 6 files changed, 141 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java
index 623b85c..da11262 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java
@@ -16,9 +16,12 @@
  */
 package org.apache.camel.api.management.mbean;
 
+import javax.management.openmbean.TabularData;
+
 import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
 
-public interface ManagedPollEnricherMBean extends ManagedProcessorMBean {
+public interface ManagedPollEnricherMBean extends ManagedProcessorMBean, ManagedExtendedInformation {
 
     @ManagedAttribute(description = "The language for the expression")
     String getExpressionLanguage();
@@ -38,4 +41,7 @@ public interface ManagedPollEnricherMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "Whether to aggregate when there was an exception thrown during calling the resource endpoint")
     Boolean isAggregateOnException();
 
+    @ManagedOperation(description = "Statistics of the endpoints that has been poll enriched from")
+    TabularData extendedInformation();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
index d957efe..399012c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
@@ -26,6 +26,7 @@ import org.apache.camel.IsSingleton;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.ServicePoolAware;
+import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.CamelContextHelper;
@@ -41,11 +42,16 @@ import org.slf4j.LoggerFactory;
  */
 public class ConsumerCache extends ServiceSupport {
     private static final Logger LOG = LoggerFactory.getLogger(ConsumerCache.class);
+
     private final CamelContext camelContext;
     private final ServicePool<Endpoint, PollingConsumer> pool;
     private final Map<String, PollingConsumer> consumers;
     private final Object source;
 
+    private EndpointUtilizationStatistics statistics;
+    private boolean extendedStatistics;
+    private int maxCacheSize;
+
     public ConsumerCache(Object source, CamelContext camelContext) {
         this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
     }
@@ -63,6 +69,27 @@ public class ConsumerCache extends ServiceSupport {
         this.consumers = cache;
         this.source = source;
         this.pool = pool;
+        if (consumers instanceof LRUCache) {
+            maxCacheSize = ((LRUCache) consumers).getMaxCacheSize();
+        }
+
+        // only if JMX is enabled
+        if (camelContext.getManagementStrategy().getManagementAgent() != null) {
+            this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended();
+        } else {
+            this.extendedStatistics = false;
+        }
+    }
+
+    public boolean isExtendedStatistics() {
+        return extendedStatistics;
+    }
+
+    /**
+     * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics}
+     */
+    public void setExtendedStatistics(boolean extendedStatistics) {
+        this.extendedStatistics = extendedStatistics;
     }
 
     /**
@@ -156,8 +183,15 @@ public class ConsumerCache extends ServiceSupport {
                 }
             }
         }
+
+        if (answer != null) {
+            // record statistics
+            if (extendedStatistics) {
+                statistics.onHit(key);
+            }
+        }
+
         return answer;
-        
     }
  
     public Exchange receive(Endpoint endpoint) {
@@ -295,6 +329,9 @@ public class ConsumerCache extends ServiceSupport {
             LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
             cache.resetStatistics();
         }
+        if (statistics != null) {
+            statistics.clear();
+        }
     }
 
     /**
@@ -302,6 +339,13 @@ public class ConsumerCache extends ServiceSupport {
      */
     public synchronized void purge() {
         consumers.clear();
+        if (statistics != null) {
+            statistics.clear();
+        }
+    }
+
+    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
+        return statistics;
     }
 
     @Override
@@ -310,15 +354,29 @@ public class ConsumerCache extends ServiceSupport {
     }
 
     protected void doStart() throws Exception {
+        if (extendedStatistics) {
+            int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize;
+            statistics = new DefaultEndpointUtilizationStatistics(max);
+        }
+
         ServiceHelper.startServices(consumers.values());
     }
 
     protected void doStop() throws Exception {
         // when stopping we intend to shutdown
-        ServiceHelper.stopAndShutdownServices(consumers.values());
+        ServiceHelper.stopAndShutdownServices(statistics, pool);
+        try {
+            ServiceHelper.stopAndShutdownServices(consumers.values());
+        } finally {
+            // ensure consumers are removed, and also from JMX
+            for (PollingConsumer consumer : consumers.values()) {
+                getCamelContext().removeService(consumer);
+            }
+        }
         consumers.clear();
-        // we need to stop the pool service here
-        ServiceHelper.stopService(pool);
+        if (statistics != null) {
+            statistics.clear();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index 7b2cd37..5b79954 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -67,9 +67,6 @@ public class ProducerCache extends ServiceSupport {
 
     public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
         this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize));
-        if (producers instanceof LRUCache) {
-            maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
-        }
     }
 
     public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
@@ -81,6 +78,9 @@ public class ProducerCache extends ServiceSupport {
         this.camelContext = camelContext;
         this.pool = producerServicePool;
         this.producers = cache;
+        if (producers instanceof LRUCache) {
+            maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
+        }
 
         // only if JMX is enabled
         if (camelContext.getManagementStrategy().getManagementAgent() != null) {
@@ -451,7 +451,6 @@ public class ProducerCache extends ServiceSupport {
             if (extendedStatistics) {
                 statistics.onHit(key);
             }
-
         }
 
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java
index 0b71e8f..ae0b7f1 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java
@@ -16,12 +16,22 @@
  */
 package org.apache.camel.management.mbean;
 
+import java.util.Map;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
 import org.apache.camel.api.management.mbean.ManagedPollEnricherMBean;
 import org.apache.camel.model.PollEnrichDefinition;
 import org.apache.camel.processor.PollEnricher;
+import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 
 /**
@@ -31,6 +41,7 @@ import org.apache.camel.util.URISupport;
 public class ManagedPollEnricher extends ManagedProcessor implements ManagedPollEnricherMBean {
     private final PollEnricher processor;
     private String uri;
+    private boolean sanitize;
 
     public ManagedPollEnricher(CamelContext context, PollEnricher processor, PollEnrichDefinition definition) {
         super(context, processor, definition);
@@ -39,7 +50,7 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll
 
     public void init(ManagementStrategy strategy) {
         super.init(strategy);
-        boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
+        sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
         uri = getDefinition().getExpression().getExpression();
         if (sanitize) {
             uri = URISupport.sanitizeUri(uri);
@@ -47,6 +58,14 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll
     }
 
     @Override
+    public synchronized void reset() {
+        super.reset();
+        if (processor.getEndpointUtilizationStatistics() != null) {
+            processor.getEndpointUtilizationStatistics().clear();
+        }
+    }
+
+    @Override
     public PollEnrichDefinition getDefinition() {
         return (PollEnrichDefinition) super.getDefinition();
     }
@@ -85,4 +104,34 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll
     public Boolean isAggregateOnException() {
         return processor.isAggregateOnException();
     }
+
+    @Override
+    public TabularData extendedInformation() {
+        try {
+            TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType());
+
+            EndpointUtilizationStatistics stats = processor.getEndpointUtilizationStatistics();
+            if (stats != null) {
+                for (Map.Entry<String, Long> entry : stats.getStatistics().entrySet()) {
+                    CompositeType ct = CamelOpenMBeanTypes.endpointsUtilizationCompositeType();
+                    String url = entry.getKey();
+                    if (sanitize) {
+                        url = URISupport.sanitizeUri(url);
+                    }
+
+                    Long hits = entry.getValue();
+                    if (hits == null) {
+                        hits = 0L;
+                    }
+
+                    CompositeData data = new CompositeDataSupport(ct, new String[]{"url", "hits"}, new Object[]{url, hits});
+                    answer.put(data);
+                }
+            }
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 998926b..691e55d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -28,6 +28,7 @@ import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.ConsumerCache;
 import org.apache.camel.impl.EmptyConsumerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -94,6 +95,10 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
         return expression;
     }
 
+    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
+        return consumerCache.getEndpointUtilizationStatistics();
+    }
+
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java
index 657a374..790c512 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java
@@ -20,6 +20,8 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.TabularData;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.ManagementStatisticsLevel;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -29,6 +31,13 @@ import org.apache.camel.component.mock.MockEndpoint;
  */
 public class ManagedPollEnricherTest extends ManagementTestSupport {
 
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getManagementStrategy().getManagementAgent().setStatisticsLevel(ManagementStatisticsLevel.Extended);
+        return context;
+    }
+
     public void testManagePollEnricher() throws Exception {
         // JMX tests dont work well on AIX CI servers (hangs them)
         if (isPlatform("aix")) {
@@ -68,7 +77,11 @@ public class ManagedPollEnricherTest extends ManagementTestSupport {
         String uri = (String) mbeanServer.getAttribute(on, "Expression");
         assertEquals("seda:${header.whereto}", uri);
 
-        TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
+        TabularData data = (TabularData) mbeanServer.invoke(on, "extendedInformation", null, null);
+        assertNotNull(data);
+        assertEquals(1, data.size());
+
+        data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
         assertNotNull(data);
         assertEquals(3, data.size());