You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/11/14 15:31:07 UTC

svn commit: r1714336 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server: logging/messages/PortMessages.java logging/messages/Port_logmessages.properties model/port/AmqpPortImpl.java transport/MultiVersionProtocolEngineFactory.java

Author: kwall
Date: Sat Nov 14 14:31:06 2015
New Revision: 1714336

URL: http://svn.apache.org/viewvc?rev=1714336&view=rev
Log:
QPID-6816: Address review comments from Robert Godfrey <rg...@apache.org>

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java?rev=1714336&r1=1714335&r2=1714336&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java Sat Nov 14 14:31:06 2015
@@ -49,7 +49,8 @@ public class PortMessages
     public static final String CREATE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.create";
     public static final String DELETE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.delete";
     public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.close";
-    public static final String CONNECTION_REJECTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected";
+    public static final String CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected_closed";
+    public static final String CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected_too_many";
     public static final String CONNECTION_COUNT_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_count_warn";
     public static final String UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.unsupported_protocol_header";
 
@@ -60,7 +61,8 @@ public class PortMessages
         LoggerFactory.getLogger(CREATE_LOG_HIERARCHY);
         LoggerFactory.getLogger(DELETE_LOG_HIERARCHY);
         LoggerFactory.getLogger(CLOSE_LOG_HIERARCHY);
-        LoggerFactory.getLogger(CONNECTION_REJECTED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY);
+        LoggerFactory.getLogger(CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY);
         LoggerFactory.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY);
         LoggerFactory.getLogger(UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY);
 
@@ -291,14 +293,72 @@ public class PortMessages
 
     /**
      * Log a Port message of the Format:
-     * <pre>PRT-1005 : Connection from {0} rejected</pre>
+     * <pre>PRT-1005 : Connection from {0} rejected. Maximum connection count ({1, number}) for this port already reached.</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage CONNECTION_REJECTED_TOO_MANY(String param1, Number param2)
+    {
+        String rawMessage = _messages.getString("CONNECTION_REJECTED_TOO_MANY");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Port message of the Format:
+     * <pre>PRT-1008 : Connection from {0} rejected. Port closed.</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage CONNECTION_REJECTED(String param1)
+    public static LogMessage CONNECTION_REJECTED_CLOSED(String param1)
     {
-        String rawMessage = _messages.getString("CONNECTION_REJECTED");
+        String rawMessage = _messages.getString("CONNECTION_REJECTED_CLOSED");
 
         final Object[] messageArguments = {param1};
         // Create a new MessageFormat to ensure thread safety.
@@ -316,7 +376,7 @@ public class PortMessages
 
             public String getLogHierarchy()
             {
-                return CONNECTION_REJECTED_LOG_HIERARCHY;
+                return CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY;
             }
 
             @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties?rev=1714336&r1=1714335&r2=1714336&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties Sat Nov 14 14:31:06 2015
@@ -21,10 +21,10 @@
 CREATE = PRT-1001 : Create "{0}"
 OPEN = PRT-1002 : Open
 CLOSE = PRT-1003 : Close
-# 0 - flow
 CONNECTION_COUNT_WARN = PRT-1004 : Connection count {0,number} within {1, number} % of maximum {2,number}
-CONNECTION_REJECTED = PRT-1005 : Connection from {0} rejected. Maximum connection count for this port already reached.
+CONNECTION_REJECTED_TOO_MANY = PRT-1005 : Connection from {0} rejected. Maximum connection count ({1, number}) for this port already reached.
 
 DELETE = PRT-1006 : Delete {0} Port "{1}"
 
 UNSUPPORTED_PROTOCOL_HEADER = PRT-1007 : Unsupported protocol header received, replying with {0}
+CONNECTION_REJECTED_CLOSED = PRT-1008 : Connection from {0} rejected. Port closed.

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1714336&r1=1714335&r2=1714336&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Sat Nov 14 14:31:06 2015
@@ -104,7 +104,6 @@ public class AmqpPortImpl extends Abstra
             return comparison;
         }
     };
-
     @ManagedAttributeField
     private boolean _tcpNoDelay;
 
@@ -124,9 +123,10 @@ public class AmqpPortImpl extends Abstra
     private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
 
     private final Broker<?> _broker;
-    private AcceptingTransport _transport;
+    private final int _connectionWarnCount;
     private final AtomicBoolean _closing = new AtomicBoolean();
     private final SettableFuture _noConnectionsRemain = SettableFuture.create();
+    private AcceptingTransport _transport;
     private SSLContext _sslContext;
 
     @ManagedObjectFactoryConstructor
@@ -134,6 +134,7 @@ public class AmqpPortImpl extends Abstra
     {
         super(attributes, broker);
         _broker = broker;
+        _connectionWarnCount = getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT);
     }
 
     @Override
@@ -229,7 +230,7 @@ public class AmqpPortImpl extends Abstra
             Collection<Transport> transports = getTransports();
 
             TransportProvider transportProvider = null;
-            final HashSet<Transport> transportSet = new HashSet<Transport>(transports);
+            final HashSet<Transport> transportSet = new HashSet<>(transports);
             for (TransportProviderFactory tpf : (new QpidServiceLoader()).instancesOf(TransportProviderFactory.class))
             {
                 if (tpf.getSupportedTransports().contains(transports))
@@ -391,7 +392,7 @@ public class AmqpPortImpl extends Abstra
             }
             else
             {
-                Collection<TrustManager> trustManagerList = new ArrayList<TrustManager>();
+                Collection<TrustManager> trustManagerList = new ArrayList<>();
                 final QpidMultipleTrustManager mulTrustManager = new QpidMultipleTrustManager();
 
                 for(TrustStore ts : trustStores)
@@ -564,12 +565,12 @@ public class AmqpPortImpl extends Abstra
         int openConnections = _connectionCount.incrementAndGet();
         int maxOpenConnections = getMaxOpenConnections();
         if(maxOpenConnections > 0
-           && openConnections > (maxOpenConnections * getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT)) / 100
+           && openConnections > (maxOpenConnections * _connectionWarnCount) / 100
            && _connectionCountWarningGiven.compareAndSet(false, true))
         {
             _broker.getEventLogger().message(new PortLogSubject(this),
                                              PortMessages.CONNECTION_COUNT_WARN(openConnections,
-                                                                                getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT),
+                                                                                _connectionWarnCount,
                                                                                 maxOpenConnections));
         }
         return openConnections;
@@ -582,7 +583,7 @@ public class AmqpPortImpl extends Abstra
         int maxOpenConnections = getMaxOpenConnections();
 
         if(maxOpenConnections > 0
-           && openConnections < (maxOpenConnections * square(getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT))) / 10000)
+           && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000)
         {
            _connectionCountWarningGiven.compareAndSet(true,false);
         }
@@ -603,7 +604,24 @@ public class AmqpPortImpl extends Abstra
     @Override
     public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
     {
-        return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections );
+        String addressString = remoteSocketAddress.toString();
+        if (_closing.get())
+        {
+            _broker.getEventLogger().message(new PortLogSubject(this),
+                                             PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
+            return false;
+        }
+        else if (_maxOpenConnections > 0 && _connectionCount.get() >= _maxOpenConnections)
+        {
+            _broker.getEventLogger().message(new PortLogSubject(this),
+                                             PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString,
+                                                                                       _maxOpenConnections));
+            return false;
+        }
+        else
+        {
+            return true;
+        }
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java?rev=1714336&r1=1714335&r2=1714336&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java Sat Nov 14 14:31:06 2015
@@ -30,8 +30,6 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.logging.messages.PortMessages;
-import org.apache.qpid.server.logging.subjects.PortLogSubject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
@@ -94,9 +92,6 @@ public class MultiVersionProtocolEngineF
         }
         else
         {
-            _broker.getEventLogger().message(new PortLogSubject(_port),
-                                             PortMessages.CONNECTION_REJECTED(remoteSocketAddress.toString()));
-
             return null;
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org