You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2021/06/22 20:38:37 UTC

[qpid-broker-j] branch main updated: QPID-8536: [Broker-J] Incorrect check of maximum open connections

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 53a99be  QPID-8536: [Broker-J] Incorrect check of maximum open connections
53a99be is described below

commit 53a99be667f06dfbe2d3a843c538fbe918375392
Author: Marek Laca <mk...@gmail.com>
AuthorDate: Tue Jun 15 10:33:25 2021 +0200

    QPID-8536: [Broker-J] Incorrect check of maximum open connections
    
    This closes #93
---
 .../apache/qpid/server/model/port/AmqpPort.java    |  8 +--
 .../qpid/server/model/port/AmqpPortImpl.java       | 72 ++++++++++------------
 .../MultiVersionProtocolEngineFactory.java         |  3 +-
 .../qpid/server/model/port/AmqpPortImplTest.java   | 17 +++--
 .../server/transport/TCPandSSLTransportTest.java   |  2 +-
 5 files changed, 46 insertions(+), 56 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index c2b2e3d..6bfcc38 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -172,7 +172,7 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
             label = "Open Connections",
             description = "Current number of connections made through this port",
             metricName = "open_connections_total")
-    int getConnectionCount();
+    long getConnectionCount();
 
     @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT,
             label = "Total Connections",
@@ -204,11 +204,9 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
                                     + " 0 disables.")
     int getHeartbeatDelay();
 
-    boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);
+    boolean acceptNewConnectionAndIncrementCount(final SocketAddress remoteSocketAddress);
 
-    int incrementConnectionCount();
-
-    int decrementConnectionCount();
+    long decrementConnectionCount();
 
     int getNetworkBufferSize();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index a3b8156..f59e280 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ssl.SSLContext;
@@ -91,7 +90,7 @@ public class AmqpPortImpl extends AbstractPort<AmqpPortImpl> implements AmqpPort
     @ManagedAttributeField
     private int _numberOfSelectors;
 
-    private final AtomicInteger _connectionCount = new AtomicInteger();
+    private final AtomicLong _connectionCount = new AtomicLong();
     private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
     private final AtomicLong _totalConnectionCount = new AtomicLong();
 
@@ -551,69 +550,66 @@ public class AmqpPortImpl extends AbstractPort<AmqpPortImpl> implements AmqpPort
     }
 
     @Override
-    public int incrementConnectionCount()
+    public long decrementConnectionCount()
     {
-        int openConnections = _connectionCount.incrementAndGet();
-        _totalConnectionCount.incrementAndGet();
-        int maxOpenConnections = getMaxOpenConnections();
-        if(maxOpenConnections > 0
-           && openConnections > (maxOpenConnections * _connectionWarnCount) / 100
-           && _connectionCountWarningGiven.compareAndSet(false, true))
-        {
-            _container.getEventLogger().message(new PortLogSubject(this),
-                                                PortMessages.CONNECTION_COUNT_WARN(openConnections,
-                                                                                _connectionWarnCount,
-                                                                                maxOpenConnections));
-        }
-        return openConnections;
-    }
+        final long maxOpenConnections = getMaxOpenConnections();
+        final long openConnections = _connectionCount.decrementAndGet();
 
-    @Override
-    public int decrementConnectionCount()
-    {
-        int openConnections = _connectionCount.decrementAndGet();
-        int maxOpenConnections = getMaxOpenConnections();
-
-        if(maxOpenConnections > 0
-           && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000)
+        if (maxOpenConnections > 0L
+                && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000L)
         {
-           _connectionCountWarningGiven.compareAndSet(true,false);
+            _connectionCountWarningGiven.compareAndSet(true, false);
         }
 
-
         return openConnections;
     }
 
-    private static int square(int val)
+    private static long square(long val)
     {
         return val * val;
     }
 
     @Override
-    public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
+    public boolean acceptNewConnectionAndIncrementCount(final SocketAddress remoteSocketAddress)
     {
-        String addressString = remoteSocketAddress.toString();
+        final String addressString = remoteSocketAddress.toString();
         if (_closingOrDeleting.get())
         {
             _container.getEventLogger().message(new PortLogSubject(this),
-                                                PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
+                    PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
             return false;
         }
-        else if (_maxOpenConnections > 0 && _connectionCount.get() >= _maxOpenConnections)
+        final long maxOpenConnections = getMaxOpenConnections();
+        if (maxOpenConnections > 0L)
         {
-            _container.getEventLogger().message(new PortLogSubject(this),
-                                                PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString,
-                                                                                       _maxOpenConnections));
-            return false;
+            long openConnections = _connectionCount
+                    .getAndUpdate(count -> count < maxOpenConnections ? count + 1L : count);
+
+            if (openConnections >= maxOpenConnections)
+            {
+                _container.getEventLogger().message(new PortLogSubject(this),
+                        PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString, _maxOpenConnections));
+                return false;
+            }
+
+            openConnections++;
+            if (openConnections > (maxOpenConnections * _connectionWarnCount) / 100L
+                    && _connectionCountWarningGiven.compareAndSet(false, true))
+            {
+                _container.getEventLogger().message(new PortLogSubject(this),
+                        PortMessages.CONNECTION_COUNT_WARN(openConnections, _connectionWarnCount, maxOpenConnections));
+            }
         }
         else
         {
-            return true;
+            _connectionCount.incrementAndGet();
         }
+        _totalConnectionCount.incrementAndGet();
+        return true;
     }
 
     @Override
-    public int getConnectionCount()
+    public long getConnectionCount()
     {
         return _connectionCount.get();
     }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java b/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
index b9f2303..a29552d 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
@@ -83,9 +83,8 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
     @Override
     public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
     {
-        if(_port.canAcceptNewConnection(remoteSocketAddress))
+        if(_port.acceptNewConnectionAndIncrementCount(remoteSocketAddress))
         {
-            _port.incrementConnectionCount();
             return new MultiVersionProtocolEngine(_broker,
                                                   _supported, _defaultSupportedReply, _port, _transport,
                                                   ID_GENERATOR.getAndIncrement(),
diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java b/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
index df46bda..257c47b 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
@@ -335,23 +335,20 @@ public class AmqpPortImplTest extends UnitTestBase
 
         for(int i = 0; i < 8; i++)
         {
-            assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
-            _port.incrementConnectionCount();
-            assertEquals((long) (i + 1), (long) _port.getConnectionCount());
+            assertTrue(_port.acceptNewConnectionAndIncrementCount(new InetSocketAddress("example.org", 0)));
+            assertEquals(i + 1L, _port.getConnectionCount());
             verify(mockLogger, never()).message(any(LogSubject.class), any(LogMessage.class));
         }
 
-        assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
-        _port.incrementConnectionCount();
-        assertEquals((long) 9, (long) _port.getConnectionCount());
+        assertTrue(_port.acceptNewConnectionAndIncrementCount(new InetSocketAddress("example.org", 0)));
+        assertEquals(9, _port.getConnectionCount());
         verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class));
 
-        assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
-        _port.incrementConnectionCount();
-        assertEquals((long) 10, (long) _port.getConnectionCount());
+        assertTrue(_port.acceptNewConnectionAndIncrementCount(new InetSocketAddress("example.org", 0)));
+        assertEquals(10, _port.getConnectionCount());
         verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class));
 
-        assertFalse(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+        assertFalse(_port.acceptNewConnectionAndIncrementCount(new InetSocketAddress("example.org", 0)));
     }
 
     private AmqpPortImpl createPort(final String portName)
diff --git a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
index 8e24610..51902ef 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
@@ -264,7 +264,7 @@ public class TCPandSSLTransportTest extends UnitTestBase
         when(port.getPort()).thenReturn(0);
         when(port.getName()).thenReturn("testAmqp");
         when(port.getNetworkBufferSize()).thenReturn(64*1024);
-        when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
+        when(port.acceptNewConnectionAndIncrementCount(any(SocketAddress.class))).thenReturn(true);
         when(port.getThreadPoolSize()).thenReturn(2);
         when(port.getNumberOfSelectors()).thenReturn(1);
         when(port.getSSLContext()).thenReturn(sslContext);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org