You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/03 18:18:44 UTC
[2/2] activemq-artemis git commit: ARTEMIS-2003 - Add bridge metrics
ARTEMIS-2003 - Add bridge metrics
This commit adds support for tracking metrics for bridges for both
normal bridges and bridges that are part of a cluster. The two
statistics added in this commit are messages pending acknowledgement
and messages acknowledged but more can be added later.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e629ac45
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e629ac45
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e629ac45
Branch: refs/heads/master
Commit: e629ac4538988cbb957a9a2599b2430d4266c943
Parents: f980c34
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 31 13:01:32 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 3 14:18:34 2018 -0400
----------------------------------------------------------------------
.../api/core/management/BridgeControl.java | 29 ++++++++
.../management/ClusterConnectionControl.java | 48 ++++++++++++++
.../core/management/impl/BridgeControlImpl.java | 35 +++++++++-
.../impl/ClusterConnectionControlImpl.java | 48 +++++++++++++-
.../artemis/core/server/cluster/Bridge.java | 3 +
.../core/server/cluster/ClusterConnection.java | 16 +++++
.../core/server/cluster/impl/BridgeImpl.java | 22 ++++++-
.../core/server/cluster/impl/BridgeMetrics.java | 69 ++++++++++++++++++++
.../cluster/impl/ClusterConnectionImpl.java | 20 ++++++
.../cluster/impl/ClusterConnectionMetrics.java | 64 ++++++++++++++++++
.../integration/cluster/bridge/BridgeTest.java | 6 ++
.../cluster/distribution/ClusterTestBase.java | 18 +++++
.../distribution/OneWayChainClusterTest.java | 9 +++
.../distribution/OnewayTwoNodeClusterTest.java | 18 +++++
.../management/BridgeControlTest.java | 6 ++
.../management/BridgeControlUsingCoreTest.java | 8 +++
.../ClusterConnectionControlTest.java | 14 +++-
.../ClusterConnectionControlUsingCoreTest.java | 22 +++++++
18 files changed, 445 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java
index 3bf4554..9dd7dc8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java
@@ -106,4 +106,33 @@ public interface BridgeControl extends ActiveMQComponentControl {
*/
@Attribute(desc = "whether this bridge is using high availability")
boolean isHA();
+
+ /**
+ * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but
+ * is waiting acknowledgement from the other broker. This is a cumulative total and the number of outstanding
+ * pending messages can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
+ *
+ */
+ @Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker.")
+ long getMessagesPendingAcknowledgement();
+
+ /**
+ * The messagesAcknowledged counter is the number of messages actually received by the remote broker.
+ * This is a cumulative total and the number of outstanding pending messages can be computed by subtracting
+ * messagesAcknowledged from messagesPendingAcknowledgement.
+ *
+ */
+ @Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
+ long getMessagesAcknowledged();
+
+ /**
+ * The bridge metrics for this bridge
+ *
+ * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
+ * The messagesAcknowledged counter is the number of messages actually received by the remote broker.
+ *
+ */
+ @Attribute(desc = "The metrics for this bridge. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
+ Map<String, Object> getMetrics();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java
index 194afad..39f0825 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java
@@ -96,4 +96,52 @@ public interface ClusterConnectionControl extends ActiveMQComponentControl {
*/
@Attribute(desc = "map of the nodes connected to this cluster connection (keys are node IDs, values are the addresses used to connect to the nodes)")
Map<String, String> getNodes() throws Exception;
+
+ /**
+ * The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
+ * forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)
+ *
+ * This is a cumulative total and the number of outstanding pending messages for the cluster connection
+ * can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
+ *
+ */
+ @Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)")
+ long getMessagesPendingAcknowledgement();
+
+ /**
+ * The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
+ * bridges in this cluster connection
+ *
+ * This is a cumulative total and the number of outstanding pending messages for the cluster connection
+ * can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
+ *
+ */
+ @Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
+ long getMessagesAcknowledged();
+
+ /**
+ * The current metrics for this cluster connection (aggregate over all bridges to other nodes)
+ *
+ * The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
+ * forwarded a message and is waiting acknowledgement from the other broker.
+ *
+ * The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
+ * bridges in this cluster connection
+ *
+ * @return
+ */
+ @Attribute(desc = "The metrics for this cluster connection. The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
+ Map<String, Object> getMetrics();
+
+ /**
+ * The bridge metrics for the given node in the cluster connection
+ *
+ * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
+ * The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.
+ *
+ * @throws Exception
+ */
+ @Attribute(desc = "The metrics for the bridge by nodeId. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.")
+ Map<String, Object> getBridgeMetrics(String nodeId) throws Exception;
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java
index 6e0e055..d0e5523 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java
@@ -16,11 +16,12 @@
*/
package org.apache.activemq.artemis.core.management.impl;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanOperationInfo;
import java.util.List;
import java.util.Map;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanOperationInfo;
+
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -228,6 +229,36 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
return MBeanInfoHelper.getMBeanAttributesInfo(BridgeControl.class);
}
+ @Override
+ public long getMessagesPendingAcknowledgement() {
+ clearIO();
+ try {
+ return bridge.getMetrics().getMessagesPendingAcknowledgement();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ clearIO();
+ try {
+ return bridge.getMetrics().getMessagesAcknowledged();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
+ public Map<String, Object> getMetrics() {
+ clearIO();
+ try {
+ return bridge.getMetrics().convertToMap();
+ } finally {
+ blockOnIO();
+ }
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java
index 9186dbb..24c7dcc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java
@@ -16,16 +16,18 @@
*/
package org.apache.activemq.artemis.core.management.impl;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanOperationInfo;
import java.util.List;
import java.util.Map;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanOperationInfo;
+
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl {
@@ -223,6 +225,48 @@ public class ClusterConnectionControlImpl extends AbstractControl implements Clu
return MBeanInfoHelper.getMBeanAttributesInfo(ClusterConnectionControl.class);
}
+ @Override
+ public long getMessagesPendingAcknowledgement() {
+ clearIO();
+ try {
+ return clusterConnection.getMetrics().getMessagesPendingAcknowledgement();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ clearIO();
+ try {
+ return clusterConnection.getMetrics().getMessagesAcknowledged();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
+ public Map<String, Object> getMetrics() {
+ clearIO();
+ try {
+ return clusterConnection.getMetrics().convertToMap();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
+ public Map<String, Object> getBridgeMetrics(String nodeId) {
+ clearIO();
+ try {
+ final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(nodeId);
+ return bridgeMetrics != null ? bridgeMetrics.convertToMap() : null;
+ } finally {
+ blockOnIO();
+ }
+
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java
index 7e8cacb..28fbc7c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -52,4 +53,6 @@ public interface Bridge extends Consumer, ActiveMQComponent {
void disconnect();
boolean isConnected();
+
+ BridgeMetrics getMetrics();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
index 9392ed5..6171476 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
@@ -25,6 +25,8 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
@@ -80,4 +82,18 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
long getCallTimeout();
+ /**
+ * The metric for this cluster connection
+ *
+ * @return
+ */
+ ClusterConnectionMetrics getMetrics();
+
+ /**
+ * Returns the BridgeMetrics for the bridge to the given node if exists
+ *
+ * @param nodeId
+ * @return
+ */
+ BridgeMetrics getBridgeMetrics(String nodeId);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 48f59f4..c811b63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -55,10 +55,10 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
-import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -159,6 +159,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private ActiveMQServer server;
+ private final BridgeMetrics metrics = new BridgeMetrics();
+
public BridgeImpl(final ServerLocatorInternal serverLocator,
final int initialConnectAttempts,
final int reconnectAttempts,
@@ -518,6 +520,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
ref.getQueue().acknowledge(ref);
pendingAcks.countDown();
+ metrics.incrementMessagesAcknowledged();
} else {
if (logger.isTraceEnabled()) {
logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
@@ -611,13 +614,21 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
pendingAcks.countUp();
try {
+ final HandleStatus status;
if (message.isLargeMessage()) {
deliveringLargeMessage = true;
deliverLargeMessage(dest, ref, (LargeServerMessage) message);
- return HandleStatus.HANDLED;
+ status = HandleStatus.HANDLED;
} else {
- return deliverStandardMessage(dest, ref, message);
+ status = deliverStandardMessage(dest, ref, message);
+ }
+
+ //Only increment messages pending acknowledgement if handled by bridge
+ if (status == HandleStatus.HANDLED) {
+ metrics.incrementMessagesPendingAcknowledgement();
}
+
+ return status;
} catch (Exception e) {
// If an exception happened, we must count down immediately
pendingAcks.countDown();
@@ -771,6 +782,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
@Override
+ public BridgeMetrics getMetrics() {
+ return this.metrics;
+ }
+
+ @Override
public String toString() {
return this.getClass().getSimpleName() + "@" +
Integer.toHexString(System.identityHashCode(this)) +
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java
new file mode 100644
index 0000000..d8689b6
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+public class BridgeMetrics {
+
+ public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
+ public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";
+
+ private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY);
+
+ private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_ACKNOWLEDGED_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_ACKNOWLEDGED_KEY);
+
+ private volatile long messagesPendingAcknowledgement;
+ private volatile long messagesAcknowledged;
+
+ public void incrementMessagesPendingAcknowledgement() {
+ MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER.incrementAndGet(this);
+ }
+
+ public void incrementMessagesAcknowledged() {
+ MESSAGES_ACKNOWLEDGED_UPDATER.incrementAndGet(this);
+ }
+
+ /**
+ * @return the messagesPendingAcknowledgement
+ */
+ public long getMessagesPendingAcknowledgement() {
+ return messagesPendingAcknowledgement;
+ }
+
+ /**
+ * @return the messagesAcknowledged
+ */
+ public long getMessagesAcknowledged() {
+ return messagesAcknowledged;
+ }
+
+ /**
+ * @return New map containing the Bridge metrics
+ */
+ public Map<String, Object> convertToMap() {
+ final Map<String, Object> metrics = new HashMap<>();
+ metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
+ metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);
+
+ return metrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 70923be..8495758 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -749,6 +749,26 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
topology.updateAsLive(nodeID, localMember);
}
+
+ @Override
+ public ClusterConnectionMetrics getMetrics() {
+ long messagesPendingAcknowledgement = 0;
+ long messagesAcknowledged = 0;
+ for (MessageFlowRecord record : records.values()) {
+ final BridgeMetrics metrics = record.getBridge() != null ? record.getBridge().getMetrics() : null;
+ messagesPendingAcknowledgement += metrics != null ? metrics.getMessagesPendingAcknowledgement() : 0;
+ messagesAcknowledged += metrics != null ? metrics.getMessagesAcknowledged() : 0;
+ }
+
+ return new ClusterConnectionMetrics(messagesPendingAcknowledgement, messagesAcknowledged);
+ }
+
+ @Override
+ public BridgeMetrics getBridgeMetrics(String nodeId) {
+ final MessageFlowRecord record = records.get(nodeId);
+ return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
+ }
+
private void createNewRecord(final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java
new file mode 100644
index 0000000..5b9e084
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClusterConnectionMetrics {
+
+ public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
+ public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";
+
+ private final long messagesPendingAcknowledgement;
+ private final long messagesAcknowledged;
+
+ /**
+ * @param messagesPendingAcknowledgement
+ * @param messagesAcknowledged
+ */
+ public ClusterConnectionMetrics(long messagesPendingAcknowledgement, long messagesAcknowledged) {
+ super();
+ this.messagesPendingAcknowledgement = messagesPendingAcknowledgement;
+ this.messagesAcknowledged = messagesAcknowledged;
+ }
+
+ /**
+ * @return the messagesPendingAcknowledgement
+ */
+ public long getMessagesPendingAcknowledgement() {
+ return messagesPendingAcknowledgement;
+ }
+
+ /**
+ * @return the messagesAcknowledged
+ */
+ public long getMessagesAcknowledged() {
+ return messagesAcknowledged;
+ }
+
+ /**
+ * @return New map containing the Cluster Connection metrics
+ */
+ public Map<String, Object> convertToMap() {
+ final Map<String, Object> metrics = new HashMap<>();
+ metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
+ metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);
+
+ return metrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 2d6add7..73116f8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@@ -502,6 +503,11 @@ public class BridgeTest extends ActiveMQTestBase {
sf1.close();
+ assertEquals(1, server0.getClusterManager().getBridges().size());
+ BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics();
+ assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement());
+ assertEquals(10, bridgeMetrics.getMessagesAcknowledged());
+
closeFields();
if (server0.getConfiguration().isPersistenceEnabled()) {
assertEquals(0, loadQueues(server0).size());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 89d5175..6e7f9b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -75,7 +75,9 @@ import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtoco
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
@@ -1286,6 +1288,22 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
}
+ protected void verifyClusterMetrics(final int node, final String clusterName, final long expectedMessagesPendingAcknowledgement,
+ final long expectedMessagesAcknowledged) {
+ final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName);
+ final ClusterConnectionMetrics clusterMetrics = clusterConnection.getMetrics();
+ assertEquals(expectedMessagesPendingAcknowledgement, clusterMetrics.getMessagesPendingAcknowledgement());
+ assertEquals(expectedMessagesAcknowledged, clusterMetrics.getMessagesAcknowledged());
+ }
+
+ protected void verifyBridgeMetrics(final int node, final String clusterName, final String bridgeNodeId,
+ final long expectedMessagesPendingAcknowledgement, final long expectedMessagesAcknowledged) {
+ final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName);
+ final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(bridgeNodeId);
+ assertEquals(expectedMessagesPendingAcknowledgement, bridgeMetrics.getMessagesPendingAcknowledgement());
+ assertEquals(expectedMessagesAcknowledged, bridgeMetrics.getMessagesAcknowledged());
+ }
+
protected int[] getReceivedOrder(final int consumerID) throws Exception {
return getReceivedOrder(consumerID, false);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java
index e82f465..c360b74 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java
@@ -75,6 +75,15 @@ public class OneWayChainClusterTest extends ClusterTestBase {
send(0, "queues.testaddress", 10, false, null);
verifyReceiveRoundRobin(10, 0, 1);
verifyNotReceive(0, 1);
+
+ //half of the messages should be sent over bridges to the last broker in the chain
+ //as there is a consumer on that last broker
+ verifyClusterMetrics(0, "cluster0-1", 5, 5);
+ verifyClusterMetrics(1, "cluster1-2", 5, 5);
+ verifyClusterMetrics(2, "cluster2-3", 5, 5);
+ verifyClusterMetrics(3, "cluster3-4", 5, 5);
+ verifyClusterMetrics(4, "cluster4-X", 0, 0);
+
}
@Test
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
index f722d67..c2c6982 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
@@ -198,6 +198,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase {
addConsumer(1, 0, "queue0", null);
verifyNotReceive(1);
+
+ //Should be 0 as no messages were sent to the second broker
+ verifyClusterMetrics(0, "cluster1", 0, 0);
+
+ //Should be 0 as no messages were sent to the first broker
+ verifyClusterMetrics(1, "clusterX", 0, 0);
+
+ //0 messages were sent across the bridge to the second broker
+ verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 0, 0);
}
@Test
@@ -224,6 +233,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase {
send(0, "queues.testaddress", 10, false, null);
verifyReceiveRoundRobin(10, 0, 1);
verifyNotReceive(0, 1);
+
+ //half of the messages should be sent over bridge, other half was consumed by local consumer
+ verifyClusterMetrics(0, "cluster1", 5, 5);
+
+ //Should be 0 as no messages were sent to the first broker
+ verifyClusterMetrics(1, "clusterX", 0, 0);
+
+ //5 messages were sent across the bridge to the second broker
+ verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 5, 5);
}
@Test
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java
index df63043..c0dd06d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -64,6 +65,11 @@ public class BridgeControlTest extends ManagementTestBase {
Assert.assertEquals(bridgeConfig.getRetryIntervalMultiplier(), bridgeControl.getRetryIntervalMultiplier(), 0.000001);
Assert.assertEquals(bridgeConfig.getReconnectAttempts(), bridgeControl.getReconnectAttempts());
Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), bridgeControl.isUseDuplicateDetection());
+ Map<String, Object> bridgeMetrics = bridgeControl.getMetrics();
+ Assert.assertEquals(0L, bridgeControl.getMessagesPendingAcknowledgement());
+ Assert.assertEquals(0L, bridgeControl.getMessagesAcknowledged());
+ Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
+ Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY));
String[] connectorPairData = bridgeControl.getStaticConnectors();
Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), connectorPairData[0]);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java
index e0ff4c8..bd8b51d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
@@ -69,6 +70,13 @@ public class BridgeControlUsingCoreTest extends ManagementTestBase {
Assert.assertEquals(bridgeConfig.getReconnectAttempts(), proxy.retrieveAttributeValue("reconnectAttempts", Integer.class));
Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), proxy.retrieveAttributeValue("useDuplicateDetection", Boolean.class));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> bridgeMetrics = (Map<String, Object>) proxy.retrieveAttributeValue("metrics", Map.class);
+ Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class));
+ Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesAcknowledged", Long.class));
+ Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
+ Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY));
+
Object[] data = (Object[]) proxy.retrieveAttributeValue("staticConnectors");
Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), data[0]);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java
index f414c15..1fc2c5e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java
@@ -16,14 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
-import javax.json.JsonArray;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.json.JsonArray;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -40,6 +41,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
@@ -82,6 +84,12 @@ public class ClusterConnectionControlTest extends ManagementTestBase {
Assert.assertEquals(clusterConnectionConfig1.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
Assert.assertEquals(clusterConnectionConfig1.getMessageLoadBalancingType().getType(), clusterConnectionControl.getMessageLoadBalancingType());
Assert.assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops());
+ Assert.assertEquals(0L, clusterConnectionControl.getMessagesPendingAcknowledgement());
+ Assert.assertEquals(0L, clusterConnectionControl.getMessagesAcknowledged());
+ Map<String, Object> clusterMetrics = clusterConnectionControl.getMetrics();
+ Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
+ Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_ACKNOWLEDGED_KEY));
+ Assert.assertNull(clusterConnectionControl.getBridgeMetrics("bad"));
Object[] connectors = clusterConnectionControl.getStaticConnectors();
Assert.assertEquals(1, connectors.length);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e629ac45/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
index 2756b1e..0c76a64 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
@@ -99,6 +99,28 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
}
@Override
+ public long getMessagesPendingAcknowledgement() {
+ return (Long) proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class);
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ return (Long) proxy.retrieveAttributeValue("messagesAcknowledged", Long.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, Object> getMetrics() {
+ return (Map<String, Object>) proxy.retrieveAttributeValue("metrics", Map.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, Object> getBridgeMetrics(String nodeId) throws Exception {
+ return (Map<String, Object>) proxy.invokeOperation("getBridgeMetrics", nodeId);
+ }
+
+ @Override
public boolean isStarted() {
return (Boolean) proxy.retrieveAttributeValue("started", Boolean.class);
}