You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2022/07/18 21:20:32 UTC

[activemq] branch main updated: [AMQ-8976] Add maxConnectionExceededCount metric for connectors (#850)

This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 680717cb9 [AMQ-8976] Add maxConnectionExceededCount metric for connectors (#850)
680717cb9 is described below

commit 680717cb9527a52d8f7ec81c5f9df675dd110ad4
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Jul 18 16:20:25 2022 -0500

    [AMQ-8976] Add maxConnectionExceededCount metric for connectors (#850)
---
 .../amqp/client/AmqpClientTestSupport.java          |  2 +-
 .../interop/AmqpConfiguredMaxConnectionsTest.java   |  6 ++++++
 .../java/org/apache/activemq/broker/Connector.java  | 15 +++++++++++----
 .../apache/activemq/broker/TransportConnection.java |  2 --
 .../apache/activemq/broker/TransportConnector.java  | 14 ++++++++++++++
 .../apache/activemq/broker/jmx/ConnectorView.java   |  7 ++++++-
 .../activemq/broker/jmx/ConnectorViewMBean.java     |  7 ++++++-
 .../activemq/transport/vm/VMTransportServer.java    | 11 +++++++++++
 .../apache/activemq/transport/TransportServer.java  |  4 ++++
 .../activemq/transport/TransportServerFilter.java   | 18 ++++++++++++++++++
 .../activemq/transport/tcp/TcpTransportServer.java  | 21 +++++++++++++++++----
 .../activemq/transport/udp/UdpTransportServer.java  | 11 +++++++++++
 .../transport/http/HttpTransportServer.java         | 11 +++++++++++
 .../activemq/transport/ws/WSTransportServer.java    | 11 +++++++++++
 .../auto/AutoTransportConnectionsTest.java          |  8 ++++++++
 15 files changed, 135 insertions(+), 13 deletions(-)

diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
index b42193129..d429b27c5 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
@@ -29,7 +29,7 @@ import org.junit.After;
  */
 public class AmqpClientTestSupport extends AmqpTestSupport {
 
-    private String connectorScheme = "amqp";
+    protected String connectorScheme = "amqp";
     private boolean useSSL;
 
     private List<AmqpConnection> connections = new ArrayList<AmqpConnection>();
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
index b46b46abd..6b97698af 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
@@ -68,6 +68,7 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
         }
 
         assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
+        assertEquals(Long.valueOf(0l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
 
         try {
             AmqpConnection connection = trackConnection(client.createConnection());
@@ -78,12 +79,17 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
         }
 
         assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
+        assertEquals(Long.valueOf(1l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
 
         for (AmqpConnection connection : connections) {
             connection.close();
         }
 
         assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+
+        // Confirm reset statistics
+        getProxyToConnectionView(getConnectorScheme()).resetStatistics();
+        assertEquals(Long.valueOf(0l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
     }
 
     @Override
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
index acb7e69f0..c31e7692b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
@@ -36,7 +36,12 @@ public interface Connector extends Service {
      * @return the statistics for this connector
      */
     ConnectorStatistics getStatistics();
-    
+
+    /**
+     * Reset Connector statistics
+     */
+    void resetStatistics();
+
     /**
      * @return true if update client connections when brokers leave/join a cluster
      */
@@ -46,13 +51,13 @@ public interface Connector extends Service {
      * @return true if clients should be re-balanced across the cluster
      */
     public boolean isRebalanceClusterClients();
-    
+
     /**
      * Update all the connections with information
      * about the connected brokers in the cluster
      */
     public void updateClientClusterInfo();
-    
+
     /**
      * @return true if clients should be updated when
      * a broker is removed from a broker
@@ -66,10 +71,12 @@ public interface Connector extends Service {
      * @return true/false if link stealing is enabled
      */
     boolean isAllowLinkStealing();
-    
+
     /**
      * @return The comma separated string of regex patterns to match 
      * broker names for cluster client updates
      */
     String getUpdateClusterFilter();
+
+    long getMaxConnectionExceededCount();
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 7533a714e..172c6a1f9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -19,7 +19,6 @@ package org.apache.activemq.broker;
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketException;
-import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -101,7 +100,6 @@ import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.util.NetworkBridgeUtils;
-import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index 1b9c41fa4..0498795c6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -190,6 +190,15 @@ public class TransportConnector implements Connector, BrokerServiceAware {
         return statistics;
     }
 
+    /**
+     * Reset the statistics for this connector
+     */
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+        server.resetStatistics();
+    }
+
     public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
         return messageAuthorizationPolicy;
     }
@@ -673,4 +682,9 @@ public class TransportConnector implements Connector, BrokerServiceAware {
     public void setDisplayStackTrace(boolean displayStackTrace) {
         this.displayStackTrace = displayStackTrace;
     }
+
+    @Override
+    public long getMaxConnectionExceededCount() {
+        return (server != null ? server.getMaxConnectionExceededCount() : 0l);
+    }
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorView.java
index 907829d8f..d9960e9fd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorView.java
@@ -54,7 +54,7 @@ public class ConnectorView implements ConnectorViewMBean {
      */
     @Override
     public void resetStatistics() {
-        connector.getStatistics().reset();
+        connector.resetStatistics();
     }
 
     /**
@@ -136,4 +136,9 @@ public class ConnectorView implements ConnectorViewMBean {
     public boolean isAllowLinkStealingEnabled() {
         return this.connector.isAllowLinkStealing();
     }
+
+    @Override
+    public long getMaxConnectionExceededCount() {
+        return this.connector.getMaxConnectionExceededCount();
+    }
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorViewMBean.java
index 647d34a99..97ff8f572 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ConnectorViewMBean.java
@@ -83,5 +83,10 @@ public interface ConnectorViewMBean extends Service {
     @MBeanInfo("Comma separated list of regex patterns to match broker names for cluster client updates.")
     String getUpdateClusterFilter();
 
-
+    /**
+     * @return The number of occurrences the max connection count
+     * has been exceed
+     */
+    @MBeanInfo("Max connection exceeded count")
+    long getMaxConnectionExceededCount();
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
index 8bef1cc46..3ed6012a8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
@@ -155,4 +155,15 @@ public class VMTransportServer implements TransportServer {
     public void setAllowLinkStealing(boolean allowLinkStealing) {
         this.allowLinkStealing = allowLinkStealing;
     }
+
+    @Override
+    public long getMaxConnectionExceededCount() {
+        // VM transport is not limited
+        return -1l;
+    }
+
+    @Override
+    public void resetStatistics() {
+        // VM transport does not implement statistics
+    }
 }
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
index 152edd1af..e35b53736 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
@@ -73,4 +73,8 @@ public interface TransportServer extends Service {
      * @return true if allow link stealing is enabled.
      */
     boolean isAllowLinkStealing();
+
+    long getMaxConnectionExceededCount();
+
+    void resetStatistics();
 }
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
index e30877428..6a247125c 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
@@ -32,35 +32,53 @@ public class TransportServerFilter implements TransportServer {
         this.next = next;
     }
 
+    @Override
     public URI getConnectURI() {
         return next.getConnectURI();
     }
 
+    @Override
     public void setAcceptListener(TransportAcceptListener acceptListener) {
         next.setAcceptListener(acceptListener);
     }
 
+    @Override
     public void setBrokerInfo(BrokerInfo brokerInfo) {
         next.setBrokerInfo(brokerInfo);
     }
 
+    @Override
     public void start() throws Exception {
         next.start();
     }
 
+    @Override
     public void stop() throws Exception {
         next.stop();
     }
 
+    @Override
     public InetSocketAddress getSocketAddress() {
         return next.getSocketAddress();
     }
 
+    @Override
     public boolean isSslServer() {
         return next.isSslServer();
     }
 
+    @Override
     public boolean isAllowLinkStealing() {
         return next.isAllowLinkStealing();
     }
+
+    @Override
+    public long getMaxConnectionExceededCount() {
+        return next.getMaxConnectionExceededCount();
+    }
+
+    @Override
+    public void resetStatistics() {
+        next.resetStatistics();
+    }
 }
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 6d642c02d..667cb34bc 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -38,6 +38,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ServerSocketFactory;
 import javax.net.ssl.SSLParameters;
@@ -123,6 +124,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
      * The maximum number of sockets allowed for this server
      */
     protected int maximumConnections = Integer.MAX_VALUE;
+    protected final AtomicLong maximumConnectionsExceededCount = new AtomicLong(0l);
     protected final AtomicInteger currentTransportCount = new AtomicInteger();
 
     public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
@@ -579,10 +581,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
             do {
                 currentCount = currentTransportCount.get();
                 if (currentCount >= this.maximumConnections) {
-                     throw new ExceededMaximumConnectionsException(
-                         "Exceeded the maximum number of allowed client connections. See the '" +
-                         "maximumConnections' property on the TCP transport configuration URI " +
-                         "in the ActiveMQ configuration file (e.g., activemq.xml)");
+                    this.maximumConnectionsExceededCount.incrementAndGet();
+                    throw new ExceededMaximumConnectionsException(
+                        "Exceeded the maximum number of allowed client connections. See the '" +
+                        "maximumConnections' property on the TCP transport configuration URI " +
+                        "in the ActiveMQ configuration file (e.g., activemq.xml)");
                  }
 
             //Increment this value before configuring the transport
@@ -726,4 +729,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
     public void setAllowLinkStealing(boolean allowLinkStealing) {
         this.allowLinkStealing = allowLinkStealing;
     }
+
+    @Override
+    public long getMaxConnectionExceededCount() {
+        return this.maximumConnectionsExceededCount.get();
+    }
+
+    @Override
+    public void resetStatistics() {
+        this.maximumConnectionsExceededCount.set(0l);
+    }
 }
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
index ccf7abefb..2e7163969 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
@@ -199,4 +199,15 @@ public class UdpTransportServer extends TransportServerSupport {
     public void setAllowLinkStealing(boolean allowLinkStealing) {
         this.allowLinkStealing = allowLinkStealing;
     }
+
+    @Override
+    public long getMaxConnectionExceededCount() {
+        // UDP transport deprecated
+        return -1l;
+    }
+
+    @Override
+    public void resetStatistics() {
+        // UDP transport deprecated
+    }
 }
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
index c25e4b919..5b21f0a9c 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
@@ -189,4 +189,15 @@ public class HttpTransportServer extends WebTransportServerSupport {
     public boolean isSslServer() {
         return false;
     }
+
+    @Override
+    public long getMaxConnectionExceededCount() {
+        // Max Connection Count not supported for http
+        return -1l;
+    }
+
+    @Override
+    public void resetStatistics() {
+        // Statistics not implemented for http
+    }
 }
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
index 7aaaecc73..dd559baa0 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
@@ -172,4 +172,15 @@ public class WSTransportServer extends WebTransportServerSupport implements Brok
             servlet.setBrokerService(brokerService);
         }
     }
+
+    @Override
+    public long getMaxConnectionExceededCount() {
+        // Max Connection Count not supported for ws
+        return -1l;
+    }
+
+    @Override
+    public void resetStatistics() {
+        // Statistics not implemented for ws
+    }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
index 46f82d487..f68b4e19b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
@@ -147,6 +147,8 @@ public class AutoTransportConnectionsTest {
         assertEquals(maxConnections, transportServer.getMaximumConnections());
         // No connections at first
         assertEquals(0, connector.getConnections().size());
+        // No connections exceeded at first
+        assertEquals(Long.valueOf(0l), Long.valueOf(connector.getMaxConnectionExceededCount()));
         // Release the latch to set up connections in parallel
         startupLatch.countDown();
 
@@ -162,6 +164,12 @@ public class AutoTransportConnectionsTest {
             })
         );
 
+        // The 10 extra connections exceeded connection count
+        assertEquals(Long.valueOf(10l), Long.valueOf(connector.getMaxConnectionExceededCount()));
+
+        // Confirm reset statistics
+        connector.resetStatistics();
+        assertEquals(Long.valueOf(0l), Long.valueOf(connector.getMaxConnectionExceededCount()));
     }
 
     @Test