You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/24 11:07:23 UTC
[2/2] camel git commit: CAMEL-9877: InflightRepository - Add browse
that can limit per route
CAMEL-9877: InflightRepository - Add browse that can limit per route
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/137305e7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/137305e7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/137305e7
Branch: refs/heads/master
Commit: 137305e75983e1c688e148b0c1911372207d7214
Parents: 1d828c9
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 24 10:39:38 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 24 11:07:08 2016 +0200
----------------------------------------------------------------------
.../mbean/ManagedInflightRepositoryMBean.java | 3 +
.../camel/impl/DefaultInflightRepository.java | 40 ++++++++++-
.../mbean/ManagedInflightRepository.java | 15 ++--
.../apache/camel/spi/InflightRepository.java | 31 +++++++++
.../InflightRepositoryBrowseFromRouteTest.java | 72 ++++++++++++++++++++
.../impl/InflightRepositoryBrowseTest.java | 3 +-
6 files changed, 155 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
index 0f74996..d63920c 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
@@ -35,4 +35,7 @@ public interface ManagedInflightRepositoryMBean extends ManagedServiceMBean {
@ManagedOperation(description = "Lists all the exchanges which are currently inflight, limited and sorted")
TabularData browse(int limit, boolean sortByLongestDuration);
+ @ManagedOperation(description = "List all the exchanges that origins from the given route, which are currently inflight, limited and sorted")
+ TabularData browse(String fromRouteId, int limit, boolean sortByLongestDuration);
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index ab0079b..6b65c24 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -89,14 +89,38 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
@Override
public Collection<InflightExchange> browse() {
- return browse(-1, false);
+ return browse(null, -1, false);
+ }
+
+ @Override
+ public Collection<InflightExchange> browse(String fromRouteId) {
+ return browse(fromRouteId, -1, false);
}
@Override
public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) {
+ return browse(null, limit, sortByLongestDuration);
+ }
+
+ @Override
+ public Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration) {
List<InflightExchange> answer = new ArrayList<InflightExchange>();
- List<Exchange> values = new ArrayList<Exchange>(inflight.values());
+ List<Exchange> values;
+ if (fromRouteId == null) {
+ // all values
+ values = new ArrayList<Exchange>(inflight.values());
+ } else {
+ // only if route match
+ values = new ArrayList<Exchange>();
+ for (Exchange exchange : inflight.values()) {
+ String exchangeRouteId = exchange.getFromRouteId();
+ if (fromRouteId.equals(exchangeRouteId)) {
+ values.add(exchange);
+ }
+ }
+ }
+
if (sortByLongestDuration) {
Collections.sort(values, new Comparator<Exchange>() {
@Override
@@ -202,8 +226,18 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
}
@Override
- @SuppressWarnings("unchecked")
+ public String getFromRouteId() {
+ return exchange.getFromRouteId();
+ }
+
+ @Override
public String getRouteId() {
+ return getAtRouteId();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String getAtRouteId() {
List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
if (list == null || list.isEmpty()) {
return null;
http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
index cf62de7..3d2daed 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
@@ -59,27 +59,32 @@ public class ManagedInflightRepository extends ManagedService implements Managed
@Override
public TabularData browse() {
- return browse(-1, false);
+ return browse(null, -1, false);
}
@Override
public TabularData browse(int limit, boolean sortByLongestDuration) {
+ return browse(null, limit, sortByLongestDuration);
+ }
+
+ @Override
+ public TabularData browse(String routeId, int limit, boolean sortByLongestDuration) {
try {
TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listInflightExchangesTabularType());
- Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse(limit, sortByLongestDuration);
+ Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse(routeId, limit, sortByLongestDuration);
for (InflightRepository.InflightExchange entry : exchanges) {
CompositeType ct = CamelOpenMBeanTypes.listInflightExchangesCompositeType();
String exchangeId = entry.getExchange().getExchangeId();
- String fromRouteId = entry.getExchange().getFromRouteId();
- String routeId = entry.getRouteId();
+ String fromRouteId = entry.getFromRouteId();
+ String atRouteId = entry.getAtRouteId();
String nodeId = entry.getNodeId();
String elapsed = "" + entry.getElapsed();
String duration = "" + entry.getDuration();
CompositeData data = new CompositeDataSupport(ct,
new String[]{"exchangeId", "fromRouteId", "routeId", "nodeId", "elapsed", "duration"},
- new Object[]{exchangeId, fromRouteId, routeId, nodeId, elapsed, duration});
+ new Object[]{exchangeId, fromRouteId, atRouteId, nodeId, elapsed, duration});
answer.put(data);
}
return answer;
http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
index f7aad78..3fbd710 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
@@ -57,12 +57,26 @@ public interface InflightRepository extends StaticService {
String getNodeId();
/**
+ * The id of the route where the exchange originates (started)
+ */
+ String getFromRouteId();
+
+ /**
* The id of the route where the exchange currently is being processed
* <p/>
* Is <tt>null</tt> if message history is disabled.
+ * @deprecated use {@link #getAtRouteId()}
*/
+ @Deprecated
String getRouteId();
+ /**
+ * The id of the route where the exchange currently is being processed
+ * <p/>
+ * Is <tt>null</tt> if message history is disabled.
+ */
+ String getAtRouteId();
+
}
/**
@@ -136,6 +150,13 @@ public interface InflightRepository extends StaticService {
Collection<InflightExchange> browse();
/**
+ * A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight that started from the given route.
+ *
+ * @param fromRouteId the route id, or <tt>null</tt> for all routes.
+ */
+ Collection<InflightExchange> browse(String fromRouteId);
+
+ /**
* A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight.
*
* @param limit maximum number of entries to return
@@ -144,4 +165,14 @@ public interface InflightRepository extends StaticService {
*/
Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration);
+ /**
+ * A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight that started from the given route.
+ *
+ * @param fromRouteId the route id, or <tt>null</tt> for all routes.
+ * @param limit maximum number of entries to return
+ * @param sortByLongestDuration to sort by the longest duration. Set to <tt>true</tt> to include the exchanges that has been inflight the longest time,
+ * set to <tt>false</tt> to sort by exchange id
+ */
+ Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration);
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
new file mode 100644
index 0000000..d1285d0
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Collection;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.InflightRepository;
+
+/**
+ * @version
+ */
+public class InflightRepositoryBrowseFromRouteTest extends ContextTestSupport {
+
+ public void testInflight() throws Exception {
+ assertEquals(0, context.getInflightRepository().browse().size());
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertEquals(0, context.getInflightRepository().browse().size());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId("foo")
+ .to("mock:a")
+ .to("direct:bar")
+ .to("mock:result");
+
+ from("direct:bar").routeId("bar")
+ .to("mock:b")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Collection<InflightRepository.InflightExchange> list = context.getInflightRepository().browse("foo");
+ assertEquals(1, list.size());
+
+ InflightRepository.InflightExchange inflight = list.iterator().next();
+ assertNotNull(inflight);
+
+ assertEquals(exchange, inflight.getExchange());
+ assertEquals("foo", inflight.getFromRouteId());
+ assertEquals("bar", inflight.getAtRouteId());
+ assertEquals("myProcessor", inflight.getNodeId());
+ }
+ }).id("myProcessor");
+
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
index 76b65cf..bed1f0f 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
@@ -54,7 +54,8 @@ public class InflightRepositoryBrowseTest extends ContextTestSupport {
assertNotNull(inflight);
assertEquals(exchange, inflight.getExchange());
- assertEquals("foo", inflight.getRouteId());
+ assertEquals("foo", inflight.getFromRouteId());
+ assertEquals("foo", inflight.getAtRouteId());
assertEquals("myProcessor", inflight.getNodeId());
}
}).id("myProcessor")