You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2015/03/02 15:14:20 UTC

camel git commit: Fixes CAMEL-8385: Add a OldestInflightDuration and OldestInflightExchangeId attribute to route MBeans

Repository: camel
Updated Branches:
  refs/heads/master 552fe3d1b -> 8f6d4adb0


Fixes CAMEL-8385: Add a OldestInflightDuration and OldestInflightExchangeId attribute to route MBeans


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f6d4adb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f6d4adb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f6d4adb

Branch: refs/heads/master
Commit: 8f6d4adb0161c524303e9bf7688c45976b89084a
Parents: 552fe3d
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Sat Feb 21 10:46:00 2015 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Mar 2 09:14:00 2015 -0500

----------------------------------------------------------------------
 .../api/management/mbean/ManagedRouteMBean.java |   7 ++
 .../camel/management/mbean/ManagedRoute.java    | 124 ++++++++++++++++++-
 .../ManagedInflightStatisticsTest.java          | 113 +++++++++++++++++
 3 files changed, 239 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
index 148c688..7083a0f 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
@@ -114,4 +114,11 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean {
     @ManagedOperation(description = "Returns the JSON representation of all the static endpoints (and possible dynamic) defined in this route")
     String createRouteStaticEndpointJson(boolean includeDynamic);
 
+    @ManagedAttribute(description = "Oldest inflight exchange duration")
+    Long getOldestInflightDuration();
+
+    @ManagedAttribute(description = "Oldest inflight exchange id")
+    String getOldestInflightExchangeId();
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index d186188..fba8b62 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import javax.management.AttributeValueExp;
 import javax.management.MBeanServer;
@@ -33,6 +35,7 @@ import javax.management.QueryExp;
 import javax.management.StringValueExp;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.ManagementStatisticsLevel;
 import org.apache.camel.Route;
 import org.apache.camel.ServiceStatus;
@@ -43,6 +46,7 @@ import org.apache.camel.api.management.mbean.ManagedRouteMBean;
 import org.apache.camel.model.ModelCamelContext;
 import org.apache.camel.model.ModelHelper;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.util.ObjectHelper;
 
@@ -53,6 +57,8 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
     protected final String description;
     protected final ModelCamelContext context;
     private final LoadTriplet load = new LoadTriplet();
+    private final ConcurrentSkipListMap<InFlightKey, Long> exchangesInFlightStartTimestamps = new ConcurrentSkipListMap<InFlightKey, Long>();
+    private final ConcurrentHashMap<String, InFlightKey> exchangesInFlightKeys = new ConcurrentHashMap<String, InFlightKey>();
 
     public ManagedRoute(ModelCamelContext context, Route route) {
         this.route = route;
@@ -178,7 +184,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
     public void onTimer() {
         load.update(getInflightExchanges());
     }
-    
+
     public void start() throws Exception {
         if (!context.getStatus().isStarted()) {
             throw new IllegalArgumentException("CamelContext is not started");
@@ -211,7 +217,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
         if (!context.getStatus().isStarted()) {
             throw new IllegalArgumentException("CamelContext is not started");
         }
-        String routeId = getRouteId(); 
+        String routeId = getRouteId();
         context.stopRoute(routeId);
         context.removeRoute(routeId);
     }
@@ -220,7 +226,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
         if (!context.getStatus().isStarted()) {
             throw new IllegalArgumentException("CamelContext is not started");
         }
-        String routeId = getRouteId(); 
+        String routeId = getRouteId();
         context.stopRoute(routeId, timeout, TimeUnit.SECONDS);
         context.removeRoute(routeId);
     }
@@ -329,6 +335,15 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
         String stat = dumpStatsAsXml(fullStats);
         answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\"");
         answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\"");
+        answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\"");
+        InFlightKey oldestInflightEntry = getOldestInflightEntry();
+        if (oldestInflightEntry != null) {
+            answer.append(" oldestInflightExchangeId=\"\"");
+            answer.append(" oldestInflightDuration=\"\"");
+        } else {
+            answer.append(" oldestInflightExchangeId=\"").append(oldestInflightEntry.exchangeId).append("\"");
+            answer.append(" oldestInflightDuration=\"").append(System.currentTimeMillis() - oldestInflightEntry.timeStamp).append("\"");
+        }
         answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n");
 
         if (includeProcessors) {
@@ -369,7 +384,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
 
     @Override
     public boolean equals(Object o) {
-        return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute)o).route));
+        return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute) o).route));
     }
 
     @Override
@@ -377,6 +392,106 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
         return route.hashCode();
     }
 
+    private InFlightKey getOldestInflightEntry() {
+        Map.Entry<InFlightKey, Long> entry = exchangesInFlightStartTimestamps.firstEntry();
+        if (entry != null) {
+            return entry.getKey();
+        }
+        return null;
+    }
+
+    public Long getOldestInflightDuration() {
+        InFlightKey oldest = getOldestInflightEntry();
+        if (oldest == null) {
+            return null;
+        }
+        return System.currentTimeMillis() - oldest.timeStamp;
+    }
+
+    public String getOldestInflightExchangeId() {
+        InFlightKey oldest = getOldestInflightEntry();
+        if (oldest == null) {
+            return null;
+        }
+        return oldest.exchangeId;
+    }
+
+    @Override
+    public void init(ManagementStrategy strategy) {
+        super.init(strategy);
+        exchangesInFlightStartTimestamps.clear();
+    }
+
+    @Override
+    public synchronized void processExchange(Exchange exchange) {
+        InFlightKey key = new InFlightKey(System.currentTimeMillis(), exchange.getExchangeId());
+        exchangesInFlightKeys.put(exchange.getExchangeId(), key);
+        exchangesInFlightStartTimestamps.put(key, key.timeStamp);
+        super.processExchange(exchange);
+    }
+
+    @Override
+    public synchronized void completedExchange(Exchange exchange, long time) {
+        InFlightKey key = exchangesInFlightKeys.remove(exchange.getExchangeId());
+        if (key != null) {
+            exchangesInFlightStartTimestamps.remove(key);
+        }
+        super.completedExchange(exchange, time);
+    }
+
+    private static class InFlightKey implements Comparable<InFlightKey> {
+
+        private final Long timeStamp;
+        private final String exchangeId;
+
+        InFlightKey(Long timeStamp, String exchangeId) {
+            this.exchangeId = exchangeId;
+            this.timeStamp = timeStamp;
+        }
+
+        @Override
+        public int compareTo(InFlightKey o) {
+            int compare = Long.compare(timeStamp, o.timeStamp);
+            if (compare == 0) {
+                return exchangeId.compareTo(o.exchangeId);
+            }
+            return compare;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            InFlightKey that = (InFlightKey) o;
+
+            if (!exchangeId.equals(that.exchangeId)) {
+                return false;
+            }
+            if (!timeStamp.equals(that.timeStamp)) {
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = timeStamp.hashCode();
+            result = 31 * result + exchangeId.hashCode();
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return exchangeId;
+        }
+    }
+
     /**
      * Used for sorting the processor mbeans accordingly to their index.
      */
@@ -387,5 +502,4 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
             return o1.getIndex().compareTo(o2.getIndex());
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
new file mode 100644
index 0000000..9807a44
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class ManagedInflightStatisticsTest extends ManagementTestSupport {
+
+    public void testManageStatistics() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
+        assertEquals(1, set.size());
+        ObjectName on = set.iterator().next();
+
+        Long inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
+        assertEquals(0, inflight.longValue());
+        Long ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+        assertNull(ts);
+        String id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+        assertNull(id);
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(2);
+
+        // start some exchanges.
+        template.asyncSendBody("direct:start", 1000L);
+        Thread.sleep(500);
+        template.asyncSendBody("direct:start", 1000L);
+        Thread.sleep(100);
+
+        inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
+        assertEquals(2, inflight.longValue());
+
+        ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+        assertNotNull(ts);
+        id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+        assertNotNull(id);
+
+        // Lets wait for the first exchange to complete.
+        Thread.sleep(500);
+        Long ts2 = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+        assertNotNull(ts2);
+        String id2 = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+        assertNotNull(id2);
+
+        // Lets verify the oldest changed.
+        assertTrue(!id2.equals(id));
+        assertTrue(ts2 > ts);
+
+        // Lets wait for all the exchanges to complete.
+        Thread.sleep(500);
+
+        assertMockEndpointsSatisfied();
+
+        inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
+        assertEquals(0, inflight.longValue());
+        ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+        assertNull(ts);
+        id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+        assertNull(id);
+
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                Long delay = (Long) exchange.getIn().getBody();
+                                Thread.sleep(delay.longValue());
+                            }
+                        })
+                        .to("mock:result").id("mock");
+            }
+        };
+    }
+
+}
\ No newline at end of file