You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2021/06/22 20:38:37 UTC
[qpid-broker-j] branch main updated: QPID-8536: [Broker-J]
Incorrect check of maximum open connections
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 53a99be QPID-8536: [Broker-J] Incorrect check of maximum open connections
53a99be is described below
commit 53a99be667f06dfbe2d3a843c538fbe918375392
Author: Marek Laca <mk...@gmail.com>
AuthorDate: Tue Jun 15 10:33:25 2021 +0200
QPID-8536: [Broker-J] Incorrect check of maximum open connections
This closes #93
---
.../apache/qpid/server/model/port/AmqpPort.java | 8 +--
.../qpid/server/model/port/AmqpPortImpl.java | 72 ++++++++++------------
.../MultiVersionProtocolEngineFactory.java | 3 +-
.../qpid/server/model/port/AmqpPortImplTest.java | 17 +++--
.../server/transport/TCPandSSLTransportTest.java | 2 +-
5 files changed, 46 insertions(+), 56 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index c2b2e3d..6bfcc38 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -172,7 +172,7 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
label = "Open Connections",
description = "Current number of connections made through this port",
metricName = "open_connections_total")
- int getConnectionCount();
+ long getConnectionCount();
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT,
label = "Total Connections",
@@ -204,11 +204,9 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
+ " 0 disables.")
int getHeartbeatDelay();
- boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);
+ boolean acceptNewConnectionAndIncrementCount(final SocketAddress remoteSocketAddress);
- int incrementConnectionCount();
-
- int decrementConnectionCount();
+ long decrementConnectionCount();
int getNetworkBufferSize();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index a3b8156..f59e280 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
@@ -91,7 +90,7 @@ public class AmqpPortImpl extends AbstractPort<AmqpPortImpl> implements AmqpPort
@ManagedAttributeField
private int _numberOfSelectors;
- private final AtomicInteger _connectionCount = new AtomicInteger();
+ private final AtomicLong _connectionCount = new AtomicLong();
private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
private final AtomicLong _totalConnectionCount = new AtomicLong();
@@ -551,69 +550,66 @@ public class AmqpPortImpl extends AbstractPort<AmqpPortImpl> implements AmqpPort
}
@Override
- public int incrementConnectionCount()
+ public long decrementConnectionCount()
{
- int openConnections = _connectionCount.incrementAndGet();
- _totalConnectionCount.incrementAndGet();
- int maxOpenConnections = getMaxOpenConnections();
- if(maxOpenConnections > 0
- && openConnections > (maxOpenConnections * _connectionWarnCount) / 100
- && _connectionCountWarningGiven.compareAndSet(false, true))
- {
- _container.getEventLogger().message(new PortLogSubject(this),
- PortMessages.CONNECTION_COUNT_WARN(openConnections,
- _connectionWarnCount,
- maxOpenConnections));
- }
- return openConnections;
- }
+ final long maxOpenConnections = getMaxOpenConnections();
+ final long openConnections = _connectionCount.decrementAndGet();
- @Override
- public int decrementConnectionCount()
- {
- int openConnections = _connectionCount.decrementAndGet();
- int maxOpenConnections = getMaxOpenConnections();
-
- if(maxOpenConnections > 0
- && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000)
+ if (maxOpenConnections > 0L
+ && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000L)
{
- _connectionCountWarningGiven.compareAndSet(true,false);
+ _connectionCountWarningGiven.compareAndSet(true, false);
}
-
return openConnections;
}
- private static int square(int val)
+ private static long square(long val)
{
return val * val;
}
@Override
- public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
+ public boolean acceptNewConnectionAndIncrementCount(final SocketAddress remoteSocketAddress)
{
- String addressString = remoteSocketAddress.toString();
+ final String addressString = remoteSocketAddress.toString();
if (_closingOrDeleting.get())
{
_container.getEventLogger().message(new PortLogSubject(this),
- PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
+ PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
return false;
}
- else if (_maxOpenConnections > 0 && _connectionCount.get() >= _maxOpenConnections)
+ final long maxOpenConnections = getMaxOpenConnections();
+ if (maxOpenConnections > 0L)
{
- _container.getEventLogger().message(new PortLogSubject(this),
- PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString,
- _maxOpenConnections));
- return false;
+ long openConnections = _connectionCount
+ .getAndUpdate(count -> count < maxOpenConnections ? count + 1L : count);
+
+ if (openConnections >= maxOpenConnections)
+ {
+ _container.getEventLogger().message(new PortLogSubject(this),
+ PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString, _maxOpenConnections));
+ return false;
+ }
+
+ openConnections++;
+ if (openConnections > (maxOpenConnections * _connectionWarnCount) / 100L
+ && _connectionCountWarningGiven.compareAndSet(false, true))
+ {
+ _container.getEventLogger().message(new PortLogSubject(this),
+ PortMessages.CONNECTION_COUNT_WARN(openConnections, _connectionWarnCount, maxOpenConnections));
+ }
}
else
{
- return true;
+ _connectionCount.incrementAndGet();
}
+ _totalConnectionCount.incrementAndGet();
+ return true;
}
@Override
- public int getConnectionCount()
+ public long getConnectionCount()
{
return _connectionCount.get();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java b/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
index b9f2303..a29552d 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
@@ -83,9 +83,8 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
@Override
public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
{
- if(_port.canAcceptNewConnection(remoteSocketAddress))
+ if(_port.acceptNewConnectionAndIncrementCount(remoteSocketAddress))
{
- _port.incrementConnectionCount();
return new MultiVersionProtocolEngine(_broker,
_supported, _defaultSupportedReply, _port, _transport,
ID_GENERATOR.getAndIncrement(),
diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java b/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
index df46bda..257c47b 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
@@ -335,23 +335,20 @@ public class AmqpPortImplTest extends UnitTestBase
for(int i = 0; i < 8; i++)
{
- assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
- _port.incrementConnectionCount();
- assertEquals((long) (i + 1), (long) _port.getConnectionCount());
+ assertTrue(_port.acceptNewConnectionAndIncrementCount(new InetSocketAddress("example.org", 0)));
+ assertEquals(i + 1L, _port.getConnectionCount());
verify(mockLogger, never()).message(any(LogSubject.class), any(LogMessage.class));
}
- assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
- _port.incrementConnectionCount();
- assertEquals((long) 9, (long) _port.getConnectionCount());
+ assertTrue(_port.acceptNewConnectionAndIncrementCount(new InetSocketAddress("example.org", 0)));
+ assertEquals(9, _port.getConnectionCount());
verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class));
- assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
- _port.incrementConnectionCount();
- assertEquals((long) 10, (long) _port.getConnectionCount());
+ assertTrue(_port.acceptNewConnectionAndIncrementCount(new InetSocketAddress("example.org", 0)));
+ assertEquals(10, _port.getConnectionCount());
verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class));
- assertFalse(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+ assertFalse(_port.acceptNewConnectionAndIncrementCount(new InetSocketAddress("example.org", 0)));
}
private AmqpPortImpl createPort(final String portName)
diff --git a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
index 8e24610..51902ef 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
@@ -264,7 +264,7 @@ public class TCPandSSLTransportTest extends UnitTestBase
when(port.getPort()).thenReturn(0);
when(port.getName()).thenReturn("testAmqp");
when(port.getNetworkBufferSize()).thenReturn(64*1024);
- when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
+ when(port.acceptNewConnectionAndIncrementCount(any(SocketAddress.class))).thenReturn(true);
when(port.getThreadPoolSize()).thenReturn(2);
when(port.getNumberOfSelectors()).thenReturn(1);
when(port.getSSLContext()).thenReturn(sslContext);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org