You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2022/07/18 21:20:32 UTC
[activemq] branch main updated: [AMQ-8976] Add maxConnectionExceededCount metric for connectors (#850)
This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 680717cb9 [AMQ-8976] Add maxConnectionExceededCount metric for connectors (#850)
680717cb9 is described below
commit 680717cb9527a52d8f7ec81c5f9df675dd110ad4
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Jul 18 16:20:25 2022 -0500
[AMQ-8976] Add maxConnectionExceededCount metric for connectors (#850)
---
.../amqp/client/AmqpClientTestSupport.java | 2 +-
.../interop/AmqpConfiguredMaxConnectionsTest.java | 6 ++++++
.../java/org/apache/activemq/broker/Connector.java | 15 +++++++++++----
.../apache/activemq/broker/TransportConnection.java | 2 --
.../apache/activemq/broker/TransportConnector.java | 14 ++++++++++++++
.../apache/activemq/broker/jmx/ConnectorView.java | 7 ++++++-
.../activemq/broker/jmx/ConnectorViewMBean.java | 7 ++++++-
.../activemq/transport/vm/VMTransportServer.java | 11 +++++++++++
.../apache/activemq/transport/TransportServer.java | 4 ++++
.../activemq/transport/TransportServerFilter.java | 18 ++++++++++++++++++
.../activemq/transport/tcp/TcpTransportServer.java | 21 +++++++++++++++++----
.../activemq/transport/udp/UdpTransportServer.java | 11 +++++++++++
.../transport/http/HttpTransportServer.java | 11 +++++++++++
.../activemq/transport/ws/WSTransportServer.java | 11 +++++++++++
.../auto/AutoTransportConnectionsTest.java | 8 ++++++++
15 files changed, 135 insertions(+), 13 deletions(-)
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
index b42193129..d429b27c5 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
@@ -29,7 +29,7 @@ import org.junit.After;
*/
public class AmqpClientTestSupport extends AmqpTestSupport {
- private String connectorScheme = "amqp";
+ protected String connectorScheme = "amqp";
private boolean useSSL;
private List<AmqpConnection> connections = new ArrayList<AmqpConnection>();
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
index b46b46abd..6b97698af 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
@@ -68,6 +68,7 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
}
assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
+ assertEquals(Long.valueOf(0l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
try {
AmqpConnection connection = trackConnection(client.createConnection());
@@ -78,12 +79,17 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
}
assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
+ assertEquals(Long.valueOf(1l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
for (AmqpConnection connection : connections) {
connection.close();
}
assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+
+ // Confirm reset statistics
+ getProxyToConnectionView(getConnectorScheme()).resetStatistics();
+ assertEquals(Long.valueOf(0l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
}
@Override
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
index acb7e69f0..c31e7692b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
@@ -36,7 +36,12 @@ public interface Connector extends Service {
* @return the statistics for this connector
*/
ConnectorStatistics getStatistics();
-
+
+ /**
+ * Reset Connector statistics
+ */
+ void resetStatistics();
+
/**
* @return true if update client connections when brokers leave/join a cluster
*/
@@ -46,13 +51,13 @@ public interface Connector extends Service {
* @return true if clients should be re-balanced across the cluster
*/
public boolean isRebalanceClusterClients();
-
+
/**
* Update all the connections with information
* about the connected brokers in the cluster
*/
public void updateClientClusterInfo();
-
+
/**
* @return true if clients should be updated when
* a broker is removed from a broker
@@ -66,10 +71,12 @@ public interface Connector extends Service {
* @return true/false if link stealing is enabled
*/
boolean isAllowLinkStealing();
-
+
/**
* @return The comma separated string of regex patterns to match
* broker names for cluster client updates
*/
String getUpdateClusterFilter();
+
+ long getMaxConnectionExceededCount();
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 7533a714e..172c6a1f9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -19,7 +19,6 @@ package org.apache.activemq.broker;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
-import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -101,7 +100,6 @@ import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.NetworkBridgeUtils;
-import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index 1b9c41fa4..0498795c6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -190,6 +190,15 @@ public class TransportConnector implements Connector, BrokerServiceAware {
return statistics;
}
+ /**
+ * Reset the statistics for this connector
+ */
+ @Override
+ public void resetStatistics() {
+ statistics.reset();
+ server.resetStatistics();
+ }
+
public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
return messageAuthorizationPolicy;
}
@@ -673,4 +682,9 @@ public class TransportConnector implements Connector, BrokerServiceAware {
public void setDisplayStackTrace(boolean displayStackTrace) {
this.displayStackTrace = displayStackTrace;
}
+
+ @Override
+ public long getMaxConnectionExceededCount() {
+ return (server != null ? server.getMaxConnectionExceededCount() : 0l);
+ }
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorView.java
index 907829d8f..d9960e9fd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorView.java
@@ -54,7 +54,7 @@ public class ConnectorView implements ConnectorViewMBean {
*/
@Override
public void resetStatistics() {
- connector.getStatistics().reset();
+ connector.resetStatistics();
}
/**
@@ -136,4 +136,9 @@ public class ConnectorView implements ConnectorViewMBean {
public boolean isAllowLinkStealingEnabled() {
return this.connector.isAllowLinkStealing();
}
+
+ @Override
+ public long getMaxConnectionExceededCount() {
+ return this.connector.getMaxConnectionExceededCount();
+ }
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorViewMBean.java
index 647d34a99..97ff8f572 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorViewMBean.java
@@ -83,5 +83,10 @@ public interface ConnectorViewMBean extends Service {
@MBeanInfo("Comma separated list of regex patterns to match broker names for cluster client updates.")
String getUpdateClusterFilter();
-
+ /**
+ * @return The number of occurrences the max connection count
+ * has been exceed
+ */
+ @MBeanInfo("Max connection exceeded count")
+ long getMaxConnectionExceededCount();
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
index 8bef1cc46..3ed6012a8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
@@ -155,4 +155,15 @@ public class VMTransportServer implements TransportServer {
public void setAllowLinkStealing(boolean allowLinkStealing) {
this.allowLinkStealing = allowLinkStealing;
}
+
+ @Override
+ public long getMaxConnectionExceededCount() {
+ // VM transport is not limited
+ return -1l;
+ }
+
+ @Override
+ public void resetStatistics() {
+ // VM transport does not implement statistics
+ }
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
index 152edd1af..e35b53736 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
@@ -73,4 +73,8 @@ public interface TransportServer extends Service {
* @return true if allow link stealing is enabled.
*/
boolean isAllowLinkStealing();
+
+ long getMaxConnectionExceededCount();
+
+ void resetStatistics();
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
index e30877428..6a247125c 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
@@ -32,35 +32,53 @@ public class TransportServerFilter implements TransportServer {
this.next = next;
}
+ @Override
public URI getConnectURI() {
return next.getConnectURI();
}
+ @Override
public void setAcceptListener(TransportAcceptListener acceptListener) {
next.setAcceptListener(acceptListener);
}
+ @Override
public void setBrokerInfo(BrokerInfo brokerInfo) {
next.setBrokerInfo(brokerInfo);
}
+ @Override
public void start() throws Exception {
next.start();
}
+ @Override
public void stop() throws Exception {
next.stop();
}
+ @Override
public InetSocketAddress getSocketAddress() {
return next.getSocketAddress();
}
+ @Override
public boolean isSslServer() {
return next.isSslServer();
}
+ @Override
public boolean isAllowLinkStealing() {
return next.isAllowLinkStealing();
}
+
+ @Override
+ public long getMaxConnectionExceededCount() {
+ return next.getMaxConnectionExceededCount();
+ }
+
+ @Override
+ public void resetStatistics() {
+ next.resetStatistics();
+ }
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 6d642c02d..667cb34bc 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -38,6 +38,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLParameters;
@@ -123,6 +124,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
* The maximum number of sockets allowed for this server
*/
protected int maximumConnections = Integer.MAX_VALUE;
+ protected final AtomicLong maximumConnectionsExceededCount = new AtomicLong(0l);
protected final AtomicInteger currentTransportCount = new AtomicInteger();
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
@@ -579,10 +581,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
do {
currentCount = currentTransportCount.get();
if (currentCount >= this.maximumConnections) {
- throw new ExceededMaximumConnectionsException(
- "Exceeded the maximum number of allowed client connections. See the '" +
- "maximumConnections' property on the TCP transport configuration URI " +
- "in the ActiveMQ configuration file (e.g., activemq.xml)");
+ this.maximumConnectionsExceededCount.incrementAndGet();
+ throw new ExceededMaximumConnectionsException(
+ "Exceeded the maximum number of allowed client connections. See the '" +
+ "maximumConnections' property on the TCP transport configuration URI " +
+ "in the ActiveMQ configuration file (e.g., activemq.xml)");
}
//Increment this value before configuring the transport
@@ -726,4 +729,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
public void setAllowLinkStealing(boolean allowLinkStealing) {
this.allowLinkStealing = allowLinkStealing;
}
+
+ @Override
+ public long getMaxConnectionExceededCount() {
+ return this.maximumConnectionsExceededCount.get();
+ }
+
+ @Override
+ public void resetStatistics() {
+ this.maximumConnectionsExceededCount.set(0l);
+ }
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
index ccf7abefb..2e7163969 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
@@ -199,4 +199,15 @@ public class UdpTransportServer extends TransportServerSupport {
public void setAllowLinkStealing(boolean allowLinkStealing) {
this.allowLinkStealing = allowLinkStealing;
}
+
+ @Override
+ public long getMaxConnectionExceededCount() {
+ // UDP transport deprecated
+ return -1l;
+ }
+
+ @Override
+ public void resetStatistics() {
+ // UDP transport deprecated
+ }
}
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
index c25e4b919..5b21f0a9c 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
@@ -189,4 +189,15 @@ public class HttpTransportServer extends WebTransportServerSupport {
public boolean isSslServer() {
return false;
}
+
+ @Override
+ public long getMaxConnectionExceededCount() {
+ // Max Connection Count not supported for http
+ return -1l;
+ }
+
+ @Override
+ public void resetStatistics() {
+ // Statistics not implemented for http
+ }
}
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
index 7aaaecc73..dd559baa0 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
@@ -172,4 +172,15 @@ public class WSTransportServer extends WebTransportServerSupport implements Brok
servlet.setBrokerService(brokerService);
}
}
+
+ @Override
+ public long getMaxConnectionExceededCount() {
+ // Max Connection Count not supported for ws
+ return -1l;
+ }
+
+ @Override
+ public void resetStatistics() {
+ // Statistics not implemented for ws
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
index 46f82d487..f68b4e19b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
@@ -147,6 +147,8 @@ public class AutoTransportConnectionsTest {
assertEquals(maxConnections, transportServer.getMaximumConnections());
// No connections at first
assertEquals(0, connector.getConnections().size());
+ // No connections exceeded at first
+ assertEquals(Long.valueOf(0l), Long.valueOf(connector.getMaxConnectionExceededCount()));
// Release the latch to set up connections in parallel
startupLatch.countDown();
@@ -162,6 +164,12 @@ public class AutoTransportConnectionsTest {
})
);
+ // The 10 extra connections exceeded connection count
+ assertEquals(Long.valueOf(10l), Long.valueOf(connector.getMaxConnectionExceededCount()));
+
+ // Confirm reset statistics
+ connector.resetStatistics();
+ assertEquals(Long.valueOf(0l), Long.valueOf(connector.getMaxConnectionExceededCount()));
}
@Test