You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/24 11:07:23 UTC

[2/2] camel git commit: CAMEL-9877: InflightRepository - Add browse that can limit per route

CAMEL-9877: InflightRepository - Add browse that can limit per route


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/137305e7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/137305e7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/137305e7

Branch: refs/heads/master
Commit: 137305e75983e1c688e148b0c1911372207d7214
Parents: 1d828c9
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 24 10:39:38 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 24 11:07:08 2016 +0200

----------------------------------------------------------------------
 .../mbean/ManagedInflightRepositoryMBean.java   |  3 +
 .../camel/impl/DefaultInflightRepository.java   | 40 ++++++++++-
 .../mbean/ManagedInflightRepository.java        | 15 ++--
 .../apache/camel/spi/InflightRepository.java    | 31 +++++++++
 .../InflightRepositoryBrowseFromRouteTest.java  | 72 ++++++++++++++++++++
 .../impl/InflightRepositoryBrowseTest.java      |  3 +-
 6 files changed, 155 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
index 0f74996..d63920c 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
@@ -35,4 +35,7 @@ public interface ManagedInflightRepositoryMBean extends ManagedServiceMBean {
     @ManagedOperation(description = "Lists all the exchanges which are currently inflight, limited and sorted")
     TabularData browse(int limit, boolean sortByLongestDuration);
 
+    @ManagedOperation(description = "List all the exchanges that origins from the given route, which are currently inflight, limited and sorted")
+    TabularData browse(String fromRouteId, int limit, boolean sortByLongestDuration);
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index ab0079b..6b65c24 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -89,14 +89,38 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
 
     @Override
     public Collection<InflightExchange> browse() {
-        return browse(-1, false);
+        return browse(null, -1, false);
+    }
+
+    @Override
+    public Collection<InflightExchange> browse(String fromRouteId) {
+        return browse(fromRouteId, -1, false);
     }
 
     @Override
     public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) {
+        return browse(null, limit, sortByLongestDuration);
+    }
+
+    @Override
+    public Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration) {
         List<InflightExchange> answer = new ArrayList<InflightExchange>();
 
-        List<Exchange> values = new ArrayList<Exchange>(inflight.values());
+        List<Exchange> values;
+        if (fromRouteId == null) {
+            // all values
+            values = new ArrayList<Exchange>(inflight.values());
+        } else {
+            // only if route match
+            values = new ArrayList<Exchange>();
+            for (Exchange exchange : inflight.values()) {
+                String exchangeRouteId = exchange.getFromRouteId();
+                if (fromRouteId.equals(exchangeRouteId)) {
+                    values.add(exchange);
+                }
+            }
+        }
+
         if (sortByLongestDuration) {
             Collections.sort(values, new Comparator<Exchange>() {
                 @Override
@@ -202,8 +226,18 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
         }
 
         @Override
-        @SuppressWarnings("unchecked")
+        public String getFromRouteId() {
+            return exchange.getFromRouteId();
+        }
+
+        @Override
         public String getRouteId() {
+            return getAtRouteId();
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public String getAtRouteId() {
             List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
             if (list == null || list.isEmpty()) {
                 return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
index cf62de7..3d2daed 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
@@ -59,27 +59,32 @@ public class ManagedInflightRepository extends ManagedService implements Managed
 
     @Override
     public TabularData browse() {
-        return browse(-1, false);
+        return browse(null, -1, false);
     }
 
     @Override
     public TabularData browse(int limit, boolean sortByLongestDuration) {
+        return browse(null, limit, sortByLongestDuration);
+    }
+
+    @Override
+    public TabularData browse(String routeId, int limit, boolean sortByLongestDuration) {
         try {
             TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listInflightExchangesTabularType());
-            Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse(limit, sortByLongestDuration);
+            Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse(routeId, limit, sortByLongestDuration);
 
             for (InflightRepository.InflightExchange entry : exchanges) {
                 CompositeType ct = CamelOpenMBeanTypes.listInflightExchangesCompositeType();
                 String exchangeId = entry.getExchange().getExchangeId();
-                String fromRouteId = entry.getExchange().getFromRouteId();
-                String routeId = entry.getRouteId();
+                String fromRouteId = entry.getFromRouteId();
+                String atRouteId = entry.getAtRouteId();
                 String nodeId = entry.getNodeId();
                 String elapsed = "" + entry.getElapsed();
                 String duration = "" + entry.getDuration();
 
                 CompositeData data = new CompositeDataSupport(ct,
                         new String[]{"exchangeId", "fromRouteId", "routeId", "nodeId", "elapsed", "duration"},
-                        new Object[]{exchangeId, fromRouteId, routeId, nodeId, elapsed, duration});
+                        new Object[]{exchangeId, fromRouteId, atRouteId, nodeId, elapsed, duration});
                 answer.put(data);
             }
             return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
index f7aad78..3fbd710 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
@@ -57,12 +57,26 @@ public interface InflightRepository extends StaticService {
         String getNodeId();
 
         /**
+         * The id of the route where the exchange originates (started)
+         */
+        String getFromRouteId();
+
+        /**
          * The id of the route where the exchange currently is being processed
          * <p/>
          * Is <tt>null</tt> if message history is disabled.
+         * @deprecated use {@link #getAtRouteId()}
          */
+        @Deprecated
         String getRouteId();
 
+        /**
+         * The id of the route where the exchange currently is being processed
+         * <p/>
+         * Is <tt>null</tt> if message history is disabled.
+         */
+        String getAtRouteId();
+
     }
 
     /**
@@ -136,6 +150,13 @@ public interface InflightRepository extends StaticService {
     Collection<InflightExchange> browse();
 
     /**
+     * A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight that started from the given route.
+     *
+     * @param fromRouteId  the route id, or <tt>null</tt> for all routes.
+     */
+    Collection<InflightExchange> browse(String fromRouteId);
+
+    /**
      * A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight.
      *
      * @param limit maximum number of entries to return
@@ -144,4 +165,14 @@ public interface InflightRepository extends StaticService {
      */
     Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration);
 
+    /**
+     * A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight that started from the given route.
+     *
+     * @param fromRouteId  the route id, or <tt>null</tt> for all routes.
+     * @param limit maximum number of entries to return
+     * @param sortByLongestDuration to sort by the longest duration. Set to <tt>true</tt> to include the exchanges that has been inflight the longest time,
+     *                              set to <tt>false</tt> to sort by exchange id
+     */
+    Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration);
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
new file mode 100644
index 0000000..d1285d0
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Collection;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.InflightRepository;
+
+/**
+ * @version
+ */
+public class InflightRepositoryBrowseFromRouteTest extends ContextTestSupport {
+
+    public void testInflight() throws Exception {
+        assertEquals(0, context.getInflightRepository().browse().size());
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertEquals(0, context.getInflightRepository().browse().size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").routeId("foo")
+                    .to("mock:a")
+                    .to("direct:bar")
+                    .to("mock:result");
+
+                from("direct:bar").routeId("bar")
+                        .to("mock:b")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                Collection<InflightRepository.InflightExchange> list = context.getInflightRepository().browse("foo");
+                                assertEquals(1, list.size());
+
+                                InflightRepository.InflightExchange inflight = list.iterator().next();
+                                assertNotNull(inflight);
+
+                                assertEquals(exchange, inflight.getExchange());
+                                assertEquals("foo", inflight.getFromRouteId());
+                                assertEquals("bar", inflight.getAtRouteId());
+                                assertEquals("myProcessor", inflight.getNodeId());
+                            }
+                        }).id("myProcessor");
+
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/137305e7/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
index 76b65cf..bed1f0f 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
@@ -54,7 +54,8 @@ public class InflightRepositoryBrowseTest extends ContextTestSupport {
                                 assertNotNull(inflight);
 
                                 assertEquals(exchange, inflight.getExchange());
-                                assertEquals("foo", inflight.getRouteId());
+                                assertEquals("foo", inflight.getFromRouteId());
+                                assertEquals("foo", inflight.getAtRouteId());
                                 assertEquals("myProcessor", inflight.getNodeId());
                             }
                         }).id("myProcessor")