You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/11/14 15:31:07 UTC
svn commit: r1714336 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server:
logging/messages/PortMessages.java
logging/messages/Port_logmessages.properties model/port/AmqpPortImpl.java
transport/MultiVersionProtocolEngineFactory.java
Author: kwall
Date: Sat Nov 14 14:31:06 2015
New Revision: 1714336
URL: http://svn.apache.org/viewvc?rev=1714336&view=rev
Log:
QPID-6816: Address review comments from Robert Godfrey <rg...@apache.org>
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java?rev=1714336&r1=1714335&r2=1714336&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java Sat Nov 14 14:31:06 2015
@@ -49,7 +49,8 @@ public class PortMessages
public static final String CREATE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.create";
public static final String DELETE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.delete";
public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.close";
- public static final String CONNECTION_REJECTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected";
+ public static final String CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected_closed";
+ public static final String CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected_too_many";
public static final String CONNECTION_COUNT_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_count_warn";
public static final String UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.unsupported_protocol_header";
@@ -60,7 +61,8 @@ public class PortMessages
LoggerFactory.getLogger(CREATE_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETE_LOG_HIERARCHY);
LoggerFactory.getLogger(CLOSE_LOG_HIERARCHY);
- LoggerFactory.getLogger(CONNECTION_REJECTED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY);
+ LoggerFactory.getLogger(CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY);
LoggerFactory.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY);
LoggerFactory.getLogger(UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY);
@@ -291,14 +293,72 @@ public class PortMessages
/**
* Log a Port message of the Format:
- * <pre>PRT-1005 : Connection from {0} rejected</pre>
+ * <pre>PRT-1005 : Connection from {0} rejected. Maximum connection count ({1, number}) for this port already reached.</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CONNECTION_REJECTED_TOO_MANY(String param1, Number param2)
+ {
+ String rawMessage = _messages.getString("CONNECTION_REJECTED_TOO_MANY");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1008 : Connection from {0} rejected. Port closed.</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage CONNECTION_REJECTED(String param1)
+ public static LogMessage CONNECTION_REJECTED_CLOSED(String param1)
{
- String rawMessage = _messages.getString("CONNECTION_REJECTED");
+ String rawMessage = _messages.getString("CONNECTION_REJECTED_CLOSED");
final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
@@ -316,7 +376,7 @@ public class PortMessages
public String getLogHierarchy()
{
- return CONNECTION_REJECTED_LOG_HIERARCHY;
+ return CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY;
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties?rev=1714336&r1=1714335&r2=1714336&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties Sat Nov 14 14:31:06 2015
@@ -21,10 +21,10 @@
CREATE = PRT-1001 : Create "{0}"
OPEN = PRT-1002 : Open
CLOSE = PRT-1003 : Close
-# 0 - flow
CONNECTION_COUNT_WARN = PRT-1004 : Connection count {0,number} within {1, number} % of maximum {2,number}
-CONNECTION_REJECTED = PRT-1005 : Connection from {0} rejected. Maximum connection count for this port already reached.
+CONNECTION_REJECTED_TOO_MANY = PRT-1005 : Connection from {0} rejected. Maximum connection count ({1, number}) for this port already reached.
DELETE = PRT-1006 : Delete {0} Port "{1}"
UNSUPPORTED_PROTOCOL_HEADER = PRT-1007 : Unsupported protocol header received, replying with {0}
+CONNECTION_REJECTED_CLOSED = PRT-1008 : Connection from {0} rejected. Port closed.
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1714336&r1=1714335&r2=1714336&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Sat Nov 14 14:31:06 2015
@@ -104,7 +104,6 @@ public class AmqpPortImpl extends Abstra
return comparison;
}
};
-
@ManagedAttributeField
private boolean _tcpNoDelay;
@@ -124,9 +123,10 @@ public class AmqpPortImpl extends Abstra
private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
private final Broker<?> _broker;
- private AcceptingTransport _transport;
+ private final int _connectionWarnCount;
private final AtomicBoolean _closing = new AtomicBoolean();
private final SettableFuture _noConnectionsRemain = SettableFuture.create();
+ private AcceptingTransport _transport;
private SSLContext _sslContext;
@ManagedObjectFactoryConstructor
@@ -134,6 +134,7 @@ public class AmqpPortImpl extends Abstra
{
super(attributes, broker);
_broker = broker;
+ _connectionWarnCount = getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT);
}
@Override
@@ -229,7 +230,7 @@ public class AmqpPortImpl extends Abstra
Collection<Transport> transports = getTransports();
TransportProvider transportProvider = null;
- final HashSet<Transport> transportSet = new HashSet<Transport>(transports);
+ final HashSet<Transport> transportSet = new HashSet<>(transports);
for (TransportProviderFactory tpf : (new QpidServiceLoader()).instancesOf(TransportProviderFactory.class))
{
if (tpf.getSupportedTransports().contains(transports))
@@ -391,7 +392,7 @@ public class AmqpPortImpl extends Abstra
}
else
{
- Collection<TrustManager> trustManagerList = new ArrayList<TrustManager>();
+ Collection<TrustManager> trustManagerList = new ArrayList<>();
final QpidMultipleTrustManager mulTrustManager = new QpidMultipleTrustManager();
for(TrustStore ts : trustStores)
@@ -564,12 +565,12 @@ public class AmqpPortImpl extends Abstra
int openConnections = _connectionCount.incrementAndGet();
int maxOpenConnections = getMaxOpenConnections();
if(maxOpenConnections > 0
- && openConnections > (maxOpenConnections * getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT)) / 100
+ && openConnections > (maxOpenConnections * _connectionWarnCount) / 100
&& _connectionCountWarningGiven.compareAndSet(false, true))
{
_broker.getEventLogger().message(new PortLogSubject(this),
PortMessages.CONNECTION_COUNT_WARN(openConnections,
- getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT),
+ _connectionWarnCount,
maxOpenConnections));
}
return openConnections;
@@ -582,7 +583,7 @@ public class AmqpPortImpl extends Abstra
int maxOpenConnections = getMaxOpenConnections();
if(maxOpenConnections > 0
- && openConnections < (maxOpenConnections * square(getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT))) / 10000)
+ && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000)
{
_connectionCountWarningGiven.compareAndSet(true,false);
}
@@ -603,7 +604,24 @@ public class AmqpPortImpl extends Abstra
@Override
public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
{
- return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections );
+ String addressString = remoteSocketAddress.toString();
+ if (_closing.get())
+ {
+ _broker.getEventLogger().message(new PortLogSubject(this),
+ PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
+ return false;
+ }
+ else if (_maxOpenConnections > 0 && _connectionCount.get() >= _maxOpenConnections)
+ {
+ _broker.getEventLogger().message(new PortLogSubject(this),
+ PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString,
+ _maxOpenConnections));
+ return false;
+ }
+ else
+ {
+ return true;
+ }
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java?rev=1714336&r1=1714335&r2=1714336&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java Sat Nov 14 14:31:06 2015
@@ -30,8 +30,6 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.logging.messages.PortMessages;
-import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -94,9 +92,6 @@ public class MultiVersionProtocolEngineF
}
else
{
- _broker.getEventLogger().message(new PortLogSubject(_port),
- PortMessages.CONNECTION_REJECTED(remoteSocketAddress.toString()));
-
return null;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org