You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/03 18:18:44 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2003 - Add bridge metrics

ARTEMIS-2003 - Add bridge metrics

This commit adds support for tracking metrics for bridges for both
normal bridges and bridges that are part of a cluster. The two
statistics added in this commit are messages pending acknowledgement
and messages acknowledged but more can be added later.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e629ac45
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e629ac45
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e629ac45

Branch: refs/heads/master
Commit: e629ac4538988cbb957a9a2599b2430d4266c943
Parents: f980c34
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 31 13:01:32 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 3 14:18:34 2018 -0400

----------------------------------------------------------------------
 .../api/core/management/BridgeControl.java      | 29 ++++++++
 .../management/ClusterConnectionControl.java    | 48 ++++++++++++++
 .../core/management/impl/BridgeControlImpl.java | 35 +++++++++-
 .../impl/ClusterConnectionControlImpl.java      | 48 +++++++++++++-
 .../artemis/core/server/cluster/Bridge.java     |  3 +
 .../core/server/cluster/ClusterConnection.java  | 16 +++++
 .../core/server/cluster/impl/BridgeImpl.java    | 22 ++++++-
 .../core/server/cluster/impl/BridgeMetrics.java | 69 ++++++++++++++++++++
 .../cluster/impl/ClusterConnectionImpl.java     | 20 ++++++
 .../cluster/impl/ClusterConnectionMetrics.java  | 64 ++++++++++++++++++
 .../integration/cluster/bridge/BridgeTest.java  |  6 ++
 .../cluster/distribution/ClusterTestBase.java   | 18 +++++
 .../distribution/OneWayChainClusterTest.java    |  9 +++
 .../distribution/OnewayTwoNodeClusterTest.java  | 18 +++++
 .../management/BridgeControlTest.java           |  6 ++
 .../management/BridgeControlUsingCoreTest.java  |  8 +++
 .../ClusterConnectionControlTest.java           | 14 +++-
 .../ClusterConnectionControlUsingCoreTest.java  | 22 +++++++
 18 files changed, 445 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java
index 3bf4554..9dd7dc8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java
@@ -106,4 +106,33 @@ public interface BridgeControl extends ActiveMQComponentControl {
     */
    @Attribute(desc = "whether this bridge is using high availability")
    boolean isHA();
+
+   /**
+    * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but
+    * is waiting acknowledgement from the other broker. This is a cumulative total and the number of outstanding
+    * pending messages can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
+    *
+    */
+   @Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker.")
+   long getMessagesPendingAcknowledgement();
+
+   /**
+    * The messagesAcknowledged counter is the number of messages actually received by the remote broker.
+    * This is a cumulative total and the number of outstanding pending messages can be computed by subtracting
+    * messagesAcknowledged from messagesPendingAcknowledgement.
+    *
+    */
+   @Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
+   long getMessagesAcknowledged();
+
+   /**
+    * The bridge metrics for this bridge
+    *
+    * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
+    * The messagesAcknowledged counter is the number of messages actually received by the remote broker.
+    *
+    */
+   @Attribute(desc = "The metrics for this bridge. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
+   Map<String, Object> getMetrics();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java
index 194afad..39f0825 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java
@@ -96,4 +96,52 @@ public interface ClusterConnectionControl extends ActiveMQComponentControl {
     */
    @Attribute(desc = "map of the nodes connected to this cluster connection (keys are node IDs, values are the addresses used to connect to the nodes)")
    Map<String, String> getNodes() throws Exception;
+
+   /**
+    * The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
+    * forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)
+    *
+    * This is a cumulative total and the number of outstanding pending messages for the cluster connection
+    * can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
+    *
+    */
+   @Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)")
+   long getMessagesPendingAcknowledgement();
+
+   /**
+    * The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
+    * bridges in this cluster connection
+    *
+    * This is a cumulative total and the number of outstanding pending messages for the cluster connection
+    * can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
+    *
+    */
+   @Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
+   long getMessagesAcknowledged();
+
+   /**
+    * The current metrics for this cluster connection (aggregate over all bridges to other nodes)
+    *
+    * The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
+    * forwarded a message and is waiting acknowledgement from the other broker.
+    *
+    * The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
+    * bridges in this cluster connection
+    *
+    * @return
+    */
+   @Attribute(desc = "The metrics for this cluster connection. The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
+   Map<String, Object> getMetrics();
+
+   /**
+    * The bridge metrics for the given node in the cluster connection
+    *
+    * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
+    * The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.
+    *
+    * @throws Exception
+    */
+   @Attribute(desc = "The metrics for the bridge by nodeId. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.")
+   Map<String, Object> getBridgeMetrics(String nodeId) throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java
index 6e0e055..d0e5523 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.artemis.core.management.impl;
 
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanOperationInfo;
 import java.util.List;
 import java.util.Map;
 
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanOperationInfo;
+
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.management.BridgeControl;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -228,6 +229,36 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
       return MBeanInfoHelper.getMBeanAttributesInfo(BridgeControl.class);
    }
 
+   @Override
+   public long getMessagesPendingAcknowledgement() {
+      clearIO();
+      try {
+         return bridge.getMetrics().getMessagesPendingAcknowledgement();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getMessagesAcknowledged() {
+      clearIO();
+      try {
+         return bridge.getMetrics().getMessagesAcknowledged();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public Map<String, Object> getMetrics() {
+      clearIO();
+      try {
+         return bridge.getMetrics().convertToMap();
+      } finally {
+         blockOnIO();
+      }
+   }
+
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java
index 9186dbb..24c7dcc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java
@@ -16,16 +16,18 @@
  */
 package org.apache.activemq.artemis.core.management.impl;
 
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanOperationInfo;
 import java.util.List;
 import java.util.Map;
 
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanOperationInfo;
+
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
 import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 
 public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl {
 
@@ -223,6 +225,48 @@ public class ClusterConnectionControlImpl extends AbstractControl implements Clu
       return MBeanInfoHelper.getMBeanAttributesInfo(ClusterConnectionControl.class);
    }
 
+   @Override
+   public long getMessagesPendingAcknowledgement() {
+      clearIO();
+      try {
+         return clusterConnection.getMetrics().getMessagesPendingAcknowledgement();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getMessagesAcknowledged() {
+      clearIO();
+      try {
+         return clusterConnection.getMetrics().getMessagesAcknowledged();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public Map<String, Object> getMetrics()  {
+      clearIO();
+      try {
+         return clusterConnection.getMetrics().convertToMap();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public Map<String, Object> getBridgeMetrics(String nodeId) {
+      clearIO();
+      try {
+         final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(nodeId);
+         return bridgeMetrics != null ? bridgeMetrics.convertToMap() : null;
+      } finally {
+         blockOnIO();
+      }
+
+   }
+
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java
index 7e8cacb..28fbc7c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
@@ -52,4 +53,6 @@ public interface Bridge extends Consumer, ActiveMQComponent {
    void disconnect();
 
    boolean isConnected();
+
+   BridgeMetrics getMetrics();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
index 9392ed5..6171476 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
@@ -25,6 +25,8 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
 
 public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
 
@@ -80,4 +82,18 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
 
    long getCallTimeout();
 
+   /**
+    * The metric for this cluster connection
+    *
+    * @return
+    */
+   ClusterConnectionMetrics getMetrics();
+
+   /**
+    * Returns the BridgeMetrics for the bridge to the given node if exists
+    *
+    * @param nodeId
+    * @return
+    */
+   BridgeMetrics getBridgeMetrics(String nodeId);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 48f59f4..c811b63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -55,10 +55,10 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
-import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -159,6 +159,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private ActiveMQServer server;
 
+   private final BridgeMetrics metrics = new BridgeMetrics();
+
    public BridgeImpl(final ServerLocatorInternal serverLocator,
                      final int initialConnectAttempts,
                      final int reconnectAttempts,
@@ -518,6 +520,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                }
                ref.getQueue().acknowledge(ref);
                pendingAcks.countDown();
+               metrics.incrementMessagesAcknowledged();
             } else {
                if (logger.isTraceEnabled()) {
                   logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
@@ -611,13 +614,21 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
          pendingAcks.countUp();
 
          try {
+            final HandleStatus status;
             if (message.isLargeMessage()) {
                deliveringLargeMessage = true;
                deliverLargeMessage(dest, ref, (LargeServerMessage) message);
-               return HandleStatus.HANDLED;
+               status = HandleStatus.HANDLED;
             } else {
-               return deliverStandardMessage(dest, ref, message);
+               status =  deliverStandardMessage(dest, ref, message);
+            }
+
+            //Only increment messages pending acknowledgement if handled by bridge
+            if (status == HandleStatus.HANDLED) {
+               metrics.incrementMessagesPendingAcknowledgement();
             }
+
+            return status;
          } catch (Exception e) {
             // If an exception happened, we must count down immediately
             pendingAcks.countDown();
@@ -771,6 +782,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    }
 
    @Override
+   public BridgeMetrics getMetrics() {
+      return this.metrics;
+   }
+
+   @Override
    public String toString() {
       return this.getClass().getSimpleName() + "@" +
          Integer.toHexString(System.identityHashCode(this)) +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java
new file mode 100644
index 0000000..d8689b6
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+public class BridgeMetrics {
+
+   public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
+   public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";
+
+   private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER =
+         AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY);
+
+   private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_ACKNOWLEDGED_UPDATER =
+         AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_ACKNOWLEDGED_KEY);
+
+   private volatile long messagesPendingAcknowledgement;
+   private volatile long messagesAcknowledged;
+
+   public void incrementMessagesPendingAcknowledgement() {
+      MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER.incrementAndGet(this);
+   }
+
+   public void incrementMessagesAcknowledged() {
+      MESSAGES_ACKNOWLEDGED_UPDATER.incrementAndGet(this);
+   }
+
+   /**
+    * @return the messagesPendingAcknowledgement
+    */
+   public long getMessagesPendingAcknowledgement() {
+      return messagesPendingAcknowledgement;
+   }
+
+   /**
+    * @return the messagesAcknowledged
+    */
+   public long getMessagesAcknowledged() {
+      return messagesAcknowledged;
+   }
+
+   /**
+    * @return New map containing the Bridge metrics
+    */
+   public Map<String, Object> convertToMap() {
+      final Map<String, Object> metrics = new HashMap<>();
+      metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
+      metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);
+
+      return metrics;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 70923be..8495758 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -749,6 +749,26 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
       topology.updateAsLive(nodeID, localMember);
    }
 
+
+   @Override
+   public ClusterConnectionMetrics getMetrics() {
+      long messagesPendingAcknowledgement = 0;
+      long messagesAcknowledged = 0;
+      for (MessageFlowRecord record : records.values()) {
+         final BridgeMetrics metrics = record.getBridge() != null ? record.getBridge().getMetrics() : null;
+         messagesPendingAcknowledgement += metrics != null ? metrics.getMessagesPendingAcknowledgement() : 0;
+         messagesAcknowledged += metrics != null ? metrics.getMessagesAcknowledged() : 0;
+      }
+
+      return new ClusterConnectionMetrics(messagesPendingAcknowledgement, messagesAcknowledged);
+   }
+
+   @Override
+   public BridgeMetrics getBridgeMetrics(String nodeId) {
+      final MessageFlowRecord record = records.get(nodeId);
+      return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
+   }
+
    private void createNewRecord(final long eventUID,
                                 final String targetNodeID,
                                 final TransportConfiguration connector,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java
new file mode 100644
index 0000000..5b9e084
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClusterConnectionMetrics {
+
+   public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
+   public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";
+
+   private final long messagesPendingAcknowledgement;
+   private final long messagesAcknowledged;
+
+   /**
+    * @param messagesPendingAcknowledgement
+    * @param messagesAcknowledged
+    */
+   public ClusterConnectionMetrics(long messagesPendingAcknowledgement, long messagesAcknowledged) {
+      super();
+      this.messagesPendingAcknowledgement = messagesPendingAcknowledgement;
+      this.messagesAcknowledged = messagesAcknowledged;
+   }
+
+   /**
+    * @return the messagesPendingAcknowledgement
+    */
+   public long getMessagesPendingAcknowledgement() {
+      return messagesPendingAcknowledgement;
+   }
+
+   /**
+    * @return the messagesAcknowledged
+    */
+   public long getMessagesAcknowledged() {
+      return messagesAcknowledged;
+   }
+
+   /**
+    * @return New map containing the Cluster Connection metrics
+    */
+   public Map<String, Object> convertToMap() {
+      final Map<String, Object> metrics = new HashMap<>();
+      metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
+      metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);
+
+      return metrics;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 2d6add7..73116f8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@@ -502,6 +503,11 @@ public class BridgeTest extends ActiveMQTestBase {
 
       sf1.close();
 
+      assertEquals(1, server0.getClusterManager().getBridges().size());
+      BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics();
+      assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement());
+      assertEquals(10, bridgeMetrics.getMessagesAcknowledged());
+
       closeFields();
       if (server0.getConfiguration().isPersistenceEnabled()) {
          assertEquals(0, loadQueues(server0).size());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 89d5175..6e7f9b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -75,7 +75,9 @@ import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtoco
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
@@ -1286,6 +1288,22 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
       verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
    }
 
+   protected void verifyClusterMetrics(final int node, final String clusterName, final long expectedMessagesPendingAcknowledgement,
+         final long expectedMessagesAcknowledged) {
+      final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName);
+      final ClusterConnectionMetrics clusterMetrics = clusterConnection.getMetrics();
+      assertEquals(expectedMessagesPendingAcknowledgement, clusterMetrics.getMessagesPendingAcknowledgement());
+      assertEquals(expectedMessagesAcknowledged, clusterMetrics.getMessagesAcknowledged());
+   }
+
+   protected void verifyBridgeMetrics(final int node, final String clusterName, final String bridgeNodeId,
+         final long expectedMessagesPendingAcknowledgement, final long expectedMessagesAcknowledged) {
+      final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName);
+      final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(bridgeNodeId);
+      assertEquals(expectedMessagesPendingAcknowledgement, bridgeMetrics.getMessagesPendingAcknowledgement());
+      assertEquals(expectedMessagesAcknowledged, bridgeMetrics.getMessagesAcknowledged());
+   }
+
    protected int[] getReceivedOrder(final int consumerID) throws Exception {
       return getReceivedOrder(consumerID, false);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java
index e82f465..c360b74 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java
@@ -75,6 +75,15 @@ public class OneWayChainClusterTest extends ClusterTestBase {
       send(0, "queues.testaddress", 10, false, null);
       verifyReceiveRoundRobin(10, 0, 1);
       verifyNotReceive(0, 1);
+
+      //half of the messages should be sent over bridges to the last broker in the chain
+      //as there is a consumer on that last broker
+      verifyClusterMetrics(0, "cluster0-1", 5, 5);
+      verifyClusterMetrics(1, "cluster1-2", 5, 5);
+      verifyClusterMetrics(2, "cluster2-3", 5, 5);
+      verifyClusterMetrics(3, "cluster3-4", 5, 5);
+      verifyClusterMetrics(4, "cluster4-X", 0, 0);
+
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
index f722d67..c2c6982 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
@@ -198,6 +198,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase {
 
       addConsumer(1, 0, "queue0", null);
       verifyNotReceive(1);
+
+      //Should be 0 as no messages were sent to the second broker
+      verifyClusterMetrics(0, "cluster1", 0, 0);
+
+      //Should be 0 as no messages were sent to the first broker
+      verifyClusterMetrics(1, "clusterX", 0, 0);
+
+      //0 messages were sent across the bridge to the second broker
+      verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 0, 0);
    }
 
    @Test
@@ -224,6 +233,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase {
       send(0, "queues.testaddress", 10, false, null);
       verifyReceiveRoundRobin(10, 0, 1);
       verifyNotReceive(0, 1);
+
+      //half of the messages should be sent over bridge, other half was consumed by local consumer
+      verifyClusterMetrics(0, "cluster1", 5, 5);
+
+      //Should be 0 as no messages were sent to the first broker
+      verifyClusterMetrics(1, "clusterX", 0, 0);
+
+      //5 messages were sent across the bridge to the second broker
+      verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 5, 5);
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java
index df63043..c0dd06d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -64,6 +65,11 @@ public class BridgeControlTest extends ManagementTestBase {
       Assert.assertEquals(bridgeConfig.getRetryIntervalMultiplier(), bridgeControl.getRetryIntervalMultiplier(), 0.000001);
       Assert.assertEquals(bridgeConfig.getReconnectAttempts(), bridgeControl.getReconnectAttempts());
       Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), bridgeControl.isUseDuplicateDetection());
+      Map<String, Object> bridgeMetrics = bridgeControl.getMetrics();
+      Assert.assertEquals(0L, bridgeControl.getMessagesPendingAcknowledgement());
+      Assert.assertEquals(0L, bridgeControl.getMessagesAcknowledged());
+      Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
+      Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY));
 
       String[] connectorPairData = bridgeControl.getStaticConnectors();
       Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), connectorPairData[0]);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java
index e0ff4c8..bd8b51d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
 import org.junit.Before;
@@ -69,6 +70,13 @@ public class BridgeControlUsingCoreTest extends ManagementTestBase {
       Assert.assertEquals(bridgeConfig.getReconnectAttempts(), proxy.retrieveAttributeValue("reconnectAttempts", Integer.class));
       Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), proxy.retrieveAttributeValue("useDuplicateDetection", Boolean.class));
 
+      @SuppressWarnings("unchecked")
+      Map<String, Object> bridgeMetrics = (Map<String, Object>) proxy.retrieveAttributeValue("metrics", Map.class);
+      Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class));
+      Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesAcknowledged", Long.class));
+      Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
+      Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY));
+
       Object[] data = (Object[]) proxy.retrieveAttributeValue("staticConnectors");
       Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), data[0]);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java
index f414c15..1fc2c5e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java
@@ -16,14 +16,15 @@
  */
 package org.apache.activemq.artemis.tests.integration.management;
 
-import javax.json.JsonArray;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.json.JsonArray;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -40,6 +41,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
@@ -82,6 +84,12 @@ public class ClusterConnectionControlTest extends ManagementTestBase {
       Assert.assertEquals(clusterConnectionConfig1.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
       Assert.assertEquals(clusterConnectionConfig1.getMessageLoadBalancingType().getType(), clusterConnectionControl.getMessageLoadBalancingType());
       Assert.assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops());
+      Assert.assertEquals(0L, clusterConnectionControl.getMessagesPendingAcknowledgement());
+      Assert.assertEquals(0L, clusterConnectionControl.getMessagesAcknowledged());
+      Map<String, Object> clusterMetrics = clusterConnectionControl.getMetrics();
+      Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
+      Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_ACKNOWLEDGED_KEY));
+      Assert.assertNull(clusterConnectionControl.getBridgeMetrics("bad"));
 
       Object[] connectors = clusterConnectionControl.getStaticConnectors();
       Assert.assertEquals(1, connectors.length);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
index 2756b1e..0c76a64 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
@@ -99,6 +99,28 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
          }
 
          @Override
+         public long getMessagesPendingAcknowledgement() {
+            return (Long) proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class);
+         }
+
+         @Override
+         public long getMessagesAcknowledged() {
+            return (Long) proxy.retrieveAttributeValue("messagesAcknowledged", Long.class);
+         }
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public Map<String, Object> getMetrics() {
+            return (Map<String, Object>) proxy.retrieveAttributeValue("metrics", Map.class);
+         }
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public Map<String, Object> getBridgeMetrics(String nodeId) throws Exception {
+            return (Map<String, Object>) proxy.invokeOperation("getBridgeMetrics", nodeId);
+         }
+
+         @Override
          public boolean isStarted() {
             return (Boolean) proxy.retrieveAttributeValue("started", Boolean.class);
          }