You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2015/03/02 15:14:20 UTC
camel git commit: Fixes CAMEL-8385: Add a OldestInflightDuration and
OldestInflightExchangeId attribute to route MBeans
Repository: camel
Updated Branches:
refs/heads/master 552fe3d1b -> 8f6d4adb0
Fixes CAMEL-8385: Add a OldestInflightDuration and OldestInflightExchangeId attribute to route MBeans
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f6d4adb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f6d4adb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f6d4adb
Branch: refs/heads/master
Commit: 8f6d4adb0161c524303e9bf7688c45976b89084a
Parents: 552fe3d
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Sat Feb 21 10:46:00 2015 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Mar 2 09:14:00 2015 -0500
----------------------------------------------------------------------
.../api/management/mbean/ManagedRouteMBean.java | 7 ++
.../camel/management/mbean/ManagedRoute.java | 124 ++++++++++++++++++-
.../ManagedInflightStatisticsTest.java | 113 +++++++++++++++++
3 files changed, 239 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
index 148c688..7083a0f 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
@@ -114,4 +114,11 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean {
@ManagedOperation(description = "Returns the JSON representation of all the static endpoints (and possible dynamic) defined in this route")
String createRouteStaticEndpointJson(boolean includeDynamic);
+ @ManagedAttribute(description = "Oldest inflight exchange duration")
+ Long getOldestInflightDuration();
+
+ @ManagedAttribute(description = "Oldest inflight exchange id")
+ String getOldestInflightExchangeId();
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index d186188..fba8b62 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import javax.management.AttributeValueExp;
import javax.management.MBeanServer;
@@ -33,6 +35,7 @@ import javax.management.QueryExp;
import javax.management.StringValueExp;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.ManagementStatisticsLevel;
import org.apache.camel.Route;
import org.apache.camel.ServiceStatus;
@@ -43,6 +46,7 @@ import org.apache.camel.api.management.mbean.ManagedRouteMBean;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.ModelHelper;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.util.ObjectHelper;
@@ -53,6 +57,8 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
protected final String description;
protected final ModelCamelContext context;
private final LoadTriplet load = new LoadTriplet();
+ private final ConcurrentSkipListMap<InFlightKey, Long> exchangesInFlightStartTimestamps = new ConcurrentSkipListMap<InFlightKey, Long>();
+ private final ConcurrentHashMap<String, InFlightKey> exchangesInFlightKeys = new ConcurrentHashMap<String, InFlightKey>();
public ManagedRoute(ModelCamelContext context, Route route) {
this.route = route;
@@ -178,7 +184,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
public void onTimer() {
load.update(getInflightExchanges());
}
-
+
public void start() throws Exception {
if (!context.getStatus().isStarted()) {
throw new IllegalArgumentException("CamelContext is not started");
@@ -211,7 +217,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
if (!context.getStatus().isStarted()) {
throw new IllegalArgumentException("CamelContext is not started");
}
- String routeId = getRouteId();
+ String routeId = getRouteId();
context.stopRoute(routeId);
context.removeRoute(routeId);
}
@@ -220,7 +226,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
if (!context.getStatus().isStarted()) {
throw new IllegalArgumentException("CamelContext is not started");
}
- String routeId = getRouteId();
+ String routeId = getRouteId();
context.stopRoute(routeId, timeout, TimeUnit.SECONDS);
context.removeRoute(routeId);
}
@@ -329,6 +335,15 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
String stat = dumpStatsAsXml(fullStats);
answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\"");
answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\"");
+ answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\"");
+ InFlightKey oldestInflightEntry = getOldestInflightEntry();
+ if (oldestInflightEntry != null) {
+ answer.append(" oldestInflightExchangeId=\"\"");
+ answer.append(" oldestInflightDuration=\"\"");
+ } else {
+ answer.append(" oldestInflightExchangeId=\"").append(oldestInflightEntry.exchangeId).append("\"");
+ answer.append(" oldestInflightDuration=\"").append(System.currentTimeMillis() - oldestInflightEntry.timeStamp).append("\"");
+ }
answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n");
if (includeProcessors) {
@@ -369,7 +384,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
@Override
public boolean equals(Object o) {
- return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute)o).route));
+ return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute) o).route));
}
@Override
@@ -377,6 +392,106 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
return route.hashCode();
}
+ private InFlightKey getOldestInflightEntry() {
+ Map.Entry<InFlightKey, Long> entry = exchangesInFlightStartTimestamps.firstEntry();
+ if (entry != null) {
+ return entry.getKey();
+ }
+ return null;
+ }
+
+ public Long getOldestInflightDuration() {
+ InFlightKey oldest = getOldestInflightEntry();
+ if (oldest == null) {
+ return null;
+ }
+ return System.currentTimeMillis() - oldest.timeStamp;
+ }
+
+ public String getOldestInflightExchangeId() {
+ InFlightKey oldest = getOldestInflightEntry();
+ if (oldest == null) {
+ return null;
+ }
+ return oldest.exchangeId;
+ }
+
+ @Override
+ public void init(ManagementStrategy strategy) {
+ super.init(strategy);
+ exchangesInFlightStartTimestamps.clear();
+ }
+
+ @Override
+ public synchronized void processExchange(Exchange exchange) {
+ InFlightKey key = new InFlightKey(System.currentTimeMillis(), exchange.getExchangeId());
+ exchangesInFlightKeys.put(exchange.getExchangeId(), key);
+ exchangesInFlightStartTimestamps.put(key, key.timeStamp);
+ super.processExchange(exchange);
+ }
+
+ @Override
+ public synchronized void completedExchange(Exchange exchange, long time) {
+ InFlightKey key = exchangesInFlightKeys.remove(exchange.getExchangeId());
+ if (key != null) {
+ exchangesInFlightStartTimestamps.remove(key);
+ }
+ super.completedExchange(exchange, time);
+ }
+
+ private static class InFlightKey implements Comparable<InFlightKey> {
+
+ private final Long timeStamp;
+ private final String exchangeId;
+
+ InFlightKey(Long timeStamp, String exchangeId) {
+ this.exchangeId = exchangeId;
+ this.timeStamp = timeStamp;
+ }
+
+ @Override
+ public int compareTo(InFlightKey o) {
+ int compare = Long.compare(timeStamp, o.timeStamp);
+ if (compare == 0) {
+ return exchangeId.compareTo(o.exchangeId);
+ }
+ return compare;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InFlightKey that = (InFlightKey) o;
+
+ if (!exchangeId.equals(that.exchangeId)) {
+ return false;
+ }
+ if (!timeStamp.equals(that.timeStamp)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = timeStamp.hashCode();
+ result = 31 * result + exchangeId.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return exchangeId;
+ }
+ }
+
/**
* Used for sorting the processor mbeans accordingly to their index.
*/
@@ -387,5 +502,4 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
return o1.getIndex().compareTo(o2.getIndex());
}
}
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
new file mode 100644
index 0000000..9807a44
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class ManagedInflightStatisticsTest extends ManagementTestSupport {
+
+ public void testManageStatistics() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+
+ Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
+ assertEquals(1, set.size());
+ ObjectName on = set.iterator().next();
+
+ Long inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
+ assertEquals(0, inflight.longValue());
+ Long ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+ assertNull(ts);
+ String id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+ assertNull(id);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(2);
+
+ // start some exchanges.
+ template.asyncSendBody("direct:start", 1000L);
+ Thread.sleep(500);
+ template.asyncSendBody("direct:start", 1000L);
+ Thread.sleep(100);
+
+ inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
+ assertEquals(2, inflight.longValue());
+
+ ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+ assertNotNull(ts);
+ id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+ assertNotNull(id);
+
+ // Lets wait for the first exchange to complete.
+ Thread.sleep(500);
+ Long ts2 = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+ assertNotNull(ts2);
+ String id2 = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+ assertNotNull(id2);
+
+ // Lets verify the oldest changed.
+ assertTrue(!id2.equals(id));
+ assertTrue(ts2 > ts);
+
+ // Lets wait for all the exchanges to complete.
+ Thread.sleep(500);
+
+ assertMockEndpointsSatisfied();
+
+ inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
+ assertEquals(0, inflight.longValue());
+ ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+ assertNull(ts);
+ id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+ assertNull(id);
+
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Long delay = (Long) exchange.getIn().getBody();
+ Thread.sleep(delay.longValue());
+ }
+ })
+ .to("mock:result").id("mock");
+ }
+ };
+ }
+
+}
\ No newline at end of file