You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/10 05:53:13 UTC

[camel] branch main updated: CAMEL-18558: camel-jbang - get inflight and blocked exchanges

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c624660733 CAMEL-18558: camel-jbang - get inflight and blocked exchanges
1c624660733 is described below

commit 1c624660733a4079ebb32720b24f25075cb602c4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 10 07:52:58 2022 +0200

    CAMEL-18558: camel-jbang - get inflight and blocked exchanges
---
 .../camel/spi/AsyncProcessorAwaitManager.java      |   4 +-
 .../services/org/apache/camel/dev-console/blocked  |   2 +
 .../apache/camel/impl/console/BlockedConsole.java  |  73 ++++++++++
 .../apache/camel/impl/console/InflightConsole.java |  22 ++-
 .../camel/cli/connector/LocalCliConnector.java     |  14 ++
 .../dsl/jbang/core/commands/CamelJBangMain.java    |   4 +
 .../core/commands/process/CamelContextTop.java     |   4 -
 .../jbang/core/commands/process/ListBlocked.java   | 152 +++++++++++++++++++
 .../jbang/core/commands/process/ListInflight.java  | 161 +++++++++++++++++++++
 9 files changed, 428 insertions(+), 8 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/core/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index 8bbbcad9196..9bd94216b89 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -33,7 +33,7 @@ import org.apache.camel.StaticService;
 public interface AsyncProcessorAwaitManager extends StaticService {
 
     /**
-     * Utilization statistics of the this manager.
+     * Utilization statistics of this manager.
      */
     interface Statistics {
 
@@ -122,7 +122,7 @@ public interface AsyncProcessorAwaitManager extends StaticService {
     }
 
     /**
-     * Process the given exchange sychronously.
+     * Process the given exchange synchronously.
      *
      * @param processor the async processor to call
      * @param exchange  the exchange to process
diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/blocked b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/blocked
new file mode 100644
index 00000000000..5a9113bed0c
--- /dev/null
+++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/blocked
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.BlockedConsole
diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java
new file mode 100644
index 00000000000..21af0911912
--- /dev/null
+++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.console;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.json.JsonObject;
+
+@DevConsole("blocked")
+public class BlockedConsole extends AbstractDevConsole {
+
+    public BlockedConsole() {
+        super("camel", "blocked", "Blocked Exchanges", "Display blocked exchanges");
+    }
+
+    @Override
+    protected String doCallText(Map<String, Object> options) {
+        StringBuilder sb = new StringBuilder();
+
+        AsyncProcessorAwaitManager am = getCamelContext().adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
+        sb.append(String.format("\n    Blocked: %s", am.size()));
+        for (AsyncProcessorAwaitManager.AwaitThread at : am.browse()) {
+            String age = TimeUtils.printDuration(at.getWaitDuration(), true);
+            sb.append(String.format("\n    %s (at: %s/%s age: %s)",
+                    at.getExchange().getExchangeId(), at.getRouteId(), at.getNodeId(), age));
+        }
+
+        return sb.toString();
+    }
+
+    @Override
+    protected JsonObject doCallJson(Map<String, Object> options) {
+        JsonObject root = new JsonObject();
+
+        AsyncProcessorAwaitManager am = getCamelContext().adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
+        root.put("blocked", am.size());
+
+        final List<JsonObject> list = new ArrayList<>();
+        for (AsyncProcessorAwaitManager.AwaitThread at : am.browse()) {
+            JsonObject props = new JsonObject();
+            props.put("exchangeId", at.getExchange().getExchangeId());
+            props.put("routeId", at.getRouteId());
+            props.put("nodeId", at.getNodeId());
+            props.put("duration", at.getWaitDuration());
+            list.add(props);
+        }
+        if (!list.isEmpty()) {
+            root.put("exchanges", list);
+        }
+        return root;
+    }
+
+}
diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
index a1d7c5c0384..79c4c7cd830 100644
--- a/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
+++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
@@ -28,19 +28,33 @@ import org.apache.camel.util.json.JsonObject;
 @DevConsole("inflight")
 public class InflightConsole extends AbstractDevConsole {
 
+    /**
+     * Filters the routes matching by route id, route uri
+     */
+    public static final String FILTER = "filter";
+
+    /**
+     * Limits the number of entries displayed
+     */
+    public static final String LIMIT = "limit";
+
     public InflightConsole() {
         super("camel", "inflight", "Inflight Exchanges", "Display inflight exchanges");
     }
 
     @Override
     protected String doCallText(Map<String, Object> options) {
+        String filter = (String) options.get(FILTER);
+        String limit = (String) options.get(LIMIT);
+        int max = limit == null ? Integer.MAX_VALUE : Integer.parseInt(limit);
+
         StringBuilder sb = new StringBuilder();
 
         InflightRepository repo = getCamelContext().getInflightRepository();
         sb.append(String.format("\n    Inflight: %s", repo.size()));
         sb.append(String.format("\n    InflightBrowseEnabled: %s", repo.isInflightBrowseEnabled()));
         if (repo.isInflightBrowseEnabled()) {
-            for (InflightRepository.InflightExchange ie : repo.browse()) {
+            for (InflightRepository.InflightExchange ie : repo.browse(filter, max, false)) {
                 String age = TimeUtils.printDuration(ie.getDuration(), true);
                 sb.append(String.format("\n    %s (from: %s at: %s/%s age: %s)",
                         ie.getExchange().getExchangeId(), ie.getFromRouteId(), ie.getAtRouteId(), ie.getNodeId(), age));
@@ -52,6 +66,10 @@ public class InflightConsole extends AbstractDevConsole {
 
     @Override
     protected JsonObject doCallJson(Map<String, Object> options) {
+        String filter = (String) options.get(FILTER);
+        String limit = (String) options.get(LIMIT);
+        int max = limit == null ? Integer.MAX_VALUE : Integer.parseInt(limit);
+
         JsonObject root = new JsonObject();
 
         InflightRepository repo = getCamelContext().getInflightRepository();
@@ -59,7 +77,7 @@ public class InflightConsole extends AbstractDevConsole {
         root.put("inflightBrowseEnabled", repo.isInflightBrowseEnabled());
         if (repo.isInflightBrowseEnabled()) {
             final List<JsonObject> list = new ArrayList<>();
-            for (InflightRepository.InflightExchange ie : repo.browse()) {
+            for (InflightRepository.InflightExchange ie : repo.browse(filter, max, false)) {
                 JsonObject props = new JsonObject();
                 props.put("exchangeId", ie.getExchange().getExchangeId());
                 props.put("fromRouteId", ie.getFromRouteId());
diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index 62b5243fba9..15c476dccb0 100644
--- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -376,6 +376,20 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C
                         root.put("logger", json);
                     }
                 }
+                DevConsole dc7 = dcr.resolveById("inflight");
+                if (dc7 != null) {
+                    JsonObject json = (JsonObject) dc7.call(DevConsole.MediaType.JSON);
+                    if (json != null && !json.isEmpty()) {
+                        root.put("inflight", json);
+                    }
+                }
+                DevConsole dc8 = dcr.resolveById("blocked");
+                if (dc8 != null) {
+                    JsonObject json = (JsonObject) dc8.call(DevConsole.MediaType.JSON);
+                    if (json != null && !json.isEmpty()) {
+                        root.put("blocked", json);
+                    }
+                }
             }
             // various details
             JsonObject services = collectServices();
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
index 681343cf276..e5b823631ea 100644
--- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
@@ -48,7 +48,9 @@ import org.apache.camel.dsl.jbang.core.commands.process.CamelStatus;
 import org.apache.camel.dsl.jbang.core.commands.process.CamelTop;
 import org.apache.camel.dsl.jbang.core.commands.process.Hawtio;
 import org.apache.camel.dsl.jbang.core.commands.process.Jolokia;
+import org.apache.camel.dsl.jbang.core.commands.process.ListBlocked;
 import org.apache.camel.dsl.jbang.core.commands.process.ListEvent;
+import org.apache.camel.dsl.jbang.core.commands.process.ListInflight;
 import org.apache.camel.dsl.jbang.core.commands.process.ListProcess;
 import org.apache.camel.dsl.jbang.core.commands.process.ListService;
 import org.apache.camel.dsl.jbang.core.commands.process.ListVault;
@@ -73,6 +75,8 @@ public class CamelJBangMain implements Callable<Integer> {
                         .addSubcommand("processor", new CommandLine(new CamelProcessorStatus(main)))
                         .addSubcommand("endpoint", new CommandLine(new CamelEndpointStatus(main)))
                         .addSubcommand("event", new CommandLine(new ListEvent(main)))
+                        .addSubcommand("inflight", new CommandLine(new ListInflight(main)))
+                        .addSubcommand("blocked", new CommandLine(new ListBlocked(main)))
                         .addSubcommand("service", new CommandLine(new ListService(main)))
                         .addSubcommand("source", new CommandLine(new CamelSourceAction(main)))
                         .addSubcommand("vault", new CommandLine(new ListVault(main))))
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextTop.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextTop.java
index 6cdd5ae989e..f6283381267 100644
--- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextTop.java
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextTop.java
@@ -208,10 +208,6 @@ public class CamelContextTop extends ProcessBaseCommand {
         return r.threadCount + "/" + r.peakThreadCount;
     }
 
-    private String getClassLoading(Row r) {
-        return r.loadedClassCount + "/" + r.totalLoadedClassCount;
-    }
-
     private String getGC(Row r) {
         if (r.gcTime <= 0) {
             return "";
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBlocked.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBlocked.java
new file mode 100644
index 00000000000..cd8494599d0
--- /dev/null
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBlocked.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.process;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+@Command(name = "blocked",
+         description = "Get blocked messages of Camel integrations")
+public class ListBlocked extends ProcessBaseCommand {
+
+    @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1")
+    String name = "*";
+
+    @CommandLine.Option(names = { "--sort" },
+                        description = "Sort by pid, name or age", defaultValue = "pid")
+    String sort;
+
+    public ListBlocked(CamelJBangMain main) {
+        super(main);
+    }
+
+    @Override
+    public Integer call() throws Exception {
+        List<Row> rows = new ArrayList<>();
+
+        List<Long> pids = findPids(name);
+        ProcessHandle.allProcesses()
+                .filter(ph -> pids.contains(ph.pid()))
+                .forEach(ph -> {
+                    JsonObject root = loadStatus(ph.pid());
+                    // there must be a status file for the running Camel integration
+                    if (root != null) {
+                        Row row = new Row();
+                        JsonObject context = (JsonObject) root.get("context");
+                        if (context == null) {
+                            return;
+                        }
+                        row.name = context.getString("name");
+                        if ("CamelJBang".equals(row.name)) {
+                            row.name = extractName(root, ph);
+                        }
+                        row.pid = "" + ph.pid();
+                        row.uptime = extractSince(ph);
+                        row.age = TimeUtils.printSince(row.uptime);
+
+                        JsonObject jo = (JsonObject) root.get("blocked");
+                        if (jo != null) {
+                            JsonArray arr = (JsonArray) jo.get("exchanges");
+                            if (arr != null) {
+                                for (int i = 0; i < arr.size(); i++) {
+                                    row = row.copy();
+                                    jo = (JsonObject) arr.get(i);
+                                    row.exchangeId = jo.getString("exchangeId");
+                                    row.routeId = jo.getString("routeId");
+                                    row.nodeId = jo.getString("nodeId");
+                                    row.duration = jo.getLong("duration");
+                                    rows.add(row);
+                                }
+                            }
+                        }
+                    }
+                });
+
+        // sort rows
+        rows.sort(this::sortRow);
+
+        if (!rows.isEmpty()) {
+            System.out.println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList(
+                    new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
+                    new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.name),
+                    new Column().header("EXCHANGE-ID").dataAlign(HorizontalAlign.LEFT).with(r -> r.exchangeId),
+                    new Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).maxWidth(25, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.routeId),
+                    new Column().header("ID").dataAlign(HorizontalAlign.LEFT).maxWidth(25, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.nodeId),
+                    new Column().header("DURATION").dataAlign(HorizontalAlign.RIGHT).with(this::getDuration))));
+        }
+
+        return 0;
+    }
+
+    protected int sortRow(Row o1, Row o2) {
+        String s = sort;
+        int negate = 1;
+        if (s.startsWith("-")) {
+            s = s.substring(1);
+            negate = -1;
+        }
+        switch (s) {
+            case "pid":
+                return Long.compare(Long.parseLong(o1.pid), Long.parseLong(o2.pid)) * negate;
+            case "name":
+                return o1.name.compareToIgnoreCase(o2.name) * negate;
+            case "age":
+                return Long.compare(o1.uptime, o2.uptime) * negate;
+            default:
+                return 0;
+        }
+    }
+
+    private String getDuration(Row r) {
+        return TimeUtils.printDuration(r.duration);
+    }
+
+    private static class Row implements Cloneable {
+        String pid;
+        String name;
+        String age;
+        long uptime;
+        String exchangeId;
+        String routeId;
+        String nodeId;
+        long duration;
+
+        Row copy() {
+            try {
+                return (Row) clone();
+            } catch (CloneNotSupportedException e) {
+                return null;
+            }
+        }
+    }
+
+}
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
new file mode 100644
index 00000000000..e27de1b9a56
--- /dev/null
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.process;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+@Command(name = "inflight",
+         description = "Get inflight messages of Camel integrations")
+public class ListInflight extends ProcessBaseCommand {
+
+    @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1")
+    String name = "*";
+
+    @CommandLine.Option(names = { "--sort" },
+                        description = "Sort by pid, name or age", defaultValue = "pid")
+    String sort;
+
+    public ListInflight(CamelJBangMain main) {
+        super(main);
+    }
+
+    @Override
+    public Integer call() throws Exception {
+        List<Row> rows = new ArrayList<>();
+
+        List<Long> pids = findPids(name);
+        ProcessHandle.allProcesses()
+                .filter(ph -> pids.contains(ph.pid()))
+                .forEach(ph -> {
+                    JsonObject root = loadStatus(ph.pid());
+                    // there must be a status file for the running Camel integration
+                    if (root != null) {
+                        Row row = new Row();
+                        JsonObject context = (JsonObject) root.get("context");
+                        if (context == null) {
+                            return;
+                        }
+                        row.name = context.getString("name");
+                        if ("CamelJBang".equals(row.name)) {
+                            row.name = extractName(root, ph);
+                        }
+                        row.pid = "" + ph.pid();
+                        row.uptime = extractSince(ph);
+                        row.age = TimeUtils.printSince(row.uptime);
+
+                        JsonObject jo = (JsonObject) root.get("inflight");
+                        if (jo != null) {
+                            JsonArray arr = (JsonArray) jo.get("exchanges");
+                            if (arr != null) {
+                                for (int i = 0; i < arr.size(); i++) {
+                                    row = row.copy();
+                                    jo = (JsonObject) arr.get(i);
+                                    row.exchangeId = jo.getString("exchangeId");
+                                    row.fromRouteId = jo.getString("fromRouteId");
+                                    row.atRouteId = jo.getString("atRouteId");
+                                    row.nodeId = jo.getString("nodeId");
+                                    row.elapsed = jo.getLong("elapsed");
+                                    row.duration = jo.getLong("duration");
+                                    rows.add(row);
+                                }
+                            }
+                        }
+                    }
+                });
+
+        // sort rows
+        rows.sort(this::sortRow);
+
+        if (!rows.isEmpty()) {
+            System.out.println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList(
+                    new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
+                    new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.name),
+                    new Column().header("EXCHANGE-ID").dataAlign(HorizontalAlign.LEFT).with(r -> r.exchangeId),
+                    new Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).maxWidth(25, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.atRouteId),
+                    new Column().header("ID").dataAlign(HorizontalAlign.LEFT).maxWidth(25, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.nodeId),
+                    new Column().header("ELAPSED").dataAlign(HorizontalAlign.RIGHT).with(this::getElapsed),
+                    new Column().header("DURATION").dataAlign(HorizontalAlign.RIGHT).with(this::getDuration))));
+        }
+
+        return 0;
+    }
+
+    protected int sortRow(Row o1, Row o2) {
+        String s = sort;
+        int negate = 1;
+        if (s.startsWith("-")) {
+            s = s.substring(1);
+            negate = -1;
+        }
+        switch (s) {
+            case "pid":
+                return Long.compare(Long.parseLong(o1.pid), Long.parseLong(o2.pid)) * negate;
+            case "name":
+                return o1.name.compareToIgnoreCase(o2.name) * negate;
+            case "age":
+                return Long.compare(o1.uptime, o2.uptime) * negate;
+            default:
+                return 0;
+        }
+    }
+
+    private String getDuration(Row r) {
+        return TimeUtils.printDuration(r.duration);
+    }
+
+    private String getElapsed(Row r) {
+        return TimeUtils.printDuration(r.elapsed);
+    }
+
+    private static class Row implements Cloneable {
+        String pid;
+        String name;
+        String age;
+        long uptime;
+        String exchangeId;
+        String fromRouteId;
+        String atRouteId;
+        String nodeId;
+        long elapsed;
+        long duration;
+
+        Row copy() {
+            try {
+                return (Row) clone();
+            } catch (CloneNotSupportedException e) {
+                return null;
+            }
+        }
+    }
+
+}