You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/17 21:41:38 UTC

svn commit: r1632655 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/logging/messages/ broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ broker-core/src/main/java/org/apache/qpid/server/model/port/ broker-c...

Author: rgodfrey
Date: Fri Oct 17 19:41:37 2014
New Revision: 1632655

URL: http://svn.apache.org/r1632655
Log:
QPID-6165 : [Java Broker] Allow the number of open connections to be limited on a per port basis

Added:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java   (with props)
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
      - copied, changed from r1632318, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java   (with props)
Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
    qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java?rev=1632655&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java Fri Oct 17 19:41:37 2014
@@ -0,0 +1,215 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.messages;
+
+import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.logging.LogMessage;
+
+import java.text.MessageFormat;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+/**
+ * DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED.
+ *
+ * Generated using GenerateLogMessages and LogMessages.vm
+ * This file is based on the content of Port_logmessages.properties
+ *
+ * To regenerate, edit the templates/properties and run the build with -Dgenerate=true
+ */
+public class PortMessages
+{
+    private static ResourceBundle _messages;
+    private static Locale _currentLocale = BrokerProperties.getLocale();
+
+    public static final String PORT_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port";
+    public static final String OPEN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.open";
+    public static final String CREATE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.create";
+    public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.close";
+    public static final String CONNECTION_REJECTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected";
+    public static final String CONNECTION_COUNT_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_count_warn";
+
+    static
+    {
+        Logger.getLogger(PORT_LOG_HIERARCHY);
+        Logger.getLogger(OPEN_LOG_HIERARCHY);
+        Logger.getLogger(CREATE_LOG_HIERARCHY);
+        Logger.getLogger(CLOSE_LOG_HIERARCHY);
+        Logger.getLogger(CONNECTION_REJECTED_LOG_HIERARCHY);
+        Logger.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY);
+
+        _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Port_logmessages", _currentLocale);
+    }
+
+    /**
+     * Log a Port message of the Format:
+     * <pre>PRT-1002 : Open</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage OPEN()
+    {
+        String rawMessage = _messages.getString("OPEN");
+
+        final String message = rawMessage;
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return OPEN_LOG_HIERARCHY;
+            }
+        };
+    }
+
+    /**
+     * Log a Port message of the Format:
+     * <pre>PRT-1001 : Create</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage CREATE()
+    {
+        String rawMessage = _messages.getString("CREATE");
+
+        final String message = rawMessage;
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return CREATE_LOG_HIERARCHY;
+            }
+        };
+    }
+
+    /**
+     * Log a Port message of the Format:
+     * <pre>PRT-1003 : Close</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage CLOSE()
+    {
+        String rawMessage = _messages.getString("CLOSE");
+
+        final String message = rawMessage;
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return CLOSE_LOG_HIERARCHY;
+            }
+        };
+    }
+
+    /**
+     * Log a Port message of the Format:
+     * <pre>PRT-1005 : Connection from {0} reject as connection limit reached</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage CONNECTION_REJECTED(String param1)
+    {
+        String rawMessage = _messages.getString("CONNECTION_REJECTED");
+
+        final Object[] messageArguments = {param1};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return CONNECTION_REJECTED_LOG_HIERARCHY;
+            }
+        };
+    }
+
+    /**
+     * Log a Port message of the Format:
+     * <pre>PRT-1004 : Connection count {0,number} within {1, number} % of maximum {2,number}</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage CONNECTION_COUNT_WARN(Number param1, Number param2, Number param3)
+    {
+        String rawMessage = _messages.getString("CONNECTION_COUNT_WARN");
+
+        final Object[] messageArguments = {param1, param2, param3};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return CONNECTION_COUNT_WARN_LOG_HIERARCHY;
+            }
+        };
+    }
+
+
+    private PortMessages()
+    {
+    }
+
+}

Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties (from r1632318, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties&r1=1632318&r2=1632655&rev=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties Fri Oct 17 19:41:37 2014
@@ -18,23 +18,9 @@
 #
 # Default File used for all non-defined locales.
 
-CREATE = CHN-1001 : Create
+CREATE = PRT-1001 : Create
+OPEN = PRT-1002 : Open
+CLOSE = PRT-1003 : Close
 # 0 - flow
-FLOW = CHN-1002 : Flow {0}
-CLOSE = CHN-1003 : Close
-CLOSE_FORCED = CHN-1003 : Close : {0,number} - {1}
-
-# 0 - bytes allowed in prefetch
-# 1 - number of messagse.
-PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number}
-# 0 - queue causing flow control
-FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0})
-FLOW_REMOVED = CHN-1006 : Flow Control Removed
-# Channel Transactions
-# 0 - time in milliseconds
-OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
-IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
-
-DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
-DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
-DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
+CONNECTION_COUNT_WARN = PRT-1004 : Connection count {0,number} within {1, number} % of maximum {2,number}
+CONNECTION_REJECTED = PRT-1005 : Connection from {0} rejected

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java Fri Oct 17 19:41:37 2014
@@ -116,4 +116,11 @@ public class LogSubjectFormat
      */
     public static final String QUEUE_FORMAT = "vh(/{0})/qu({1})";
 
+
+    /**
+     * LOG FORMAT for the Port LogSubject,
+     * 0 - Port number
+     */
+    public static final String PORT_FORMAT = "port({0})";
+
 }

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java?rev=1632655&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java Fri Oct 17 19:41:37 2014
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.PORT_FORMAT;
+
+import org.apache.qpid.server.model.Port;
+
+public class PortLogSubject extends AbstractLogSubject
+{
+    public PortLogSubject(Port<?> port)
+    {
+        /**
+         * LOG FORMAT used by the AMQPConnectorActor follows
+         * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}.
+         *
+         * Uses a MessageFormat call to insert the required values according to
+         * these indices:
+         *
+         * 0 - Connection ID
+         * 1 - User ID
+         * 2 - IP
+         * 3 - Virtualhost
+         * 4 - Channel ID
+         */
+        setLogStringWithFormat(PORT_FORMAT, port.getPort());
+
+    }
+}

Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Fri Oct 17 19:41:37 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.model.port;
 
+import java.net.SocketAddress;
 import java.util.Map;
 import java.util.Set;
 
@@ -27,6 +28,7 @@ import org.apache.qpid.server.model.Auth
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedContextDefault;
 import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedStatistic;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.VirtualHostAlias;
@@ -41,14 +43,28 @@ public interface AmqpPort<X extends Amqp
 
     String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false";
     String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false";
+
     String SEND_BUFFER_SIZE                     = "sendBufferSize";
     String RECEIVE_BUFFER_SIZE                  = "receiveBufferSize";
+    String MAX_OPEN_CONNECTIONS = "maxOpenConnections";
+
 
     String DEFAULT_AMQP_PROTOCOLS = "qpid.port.default_amqp_protocols";
 
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
     String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
 
+    String PORT_MAX_OPEN_CONNECTIONS = "qpid.port.max_open_connections";
+
+    @ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS)
+    int DEFAULT_MAX_OPEN_CONNECTIONS = -1;
+
+    String OPEN_CONNECTIONS_WARN_PERCENT = "qpid.port.open_connections_warn_percent";
+
+    @ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
+    int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
+
+
     @ManagedAttribute(defaultValue = "*")
     String getBindingAddress();
 
@@ -79,7 +95,19 @@ public interface AmqpPort<X extends Amqp
     @ManagedAttribute( defaultValue = "${" + DEFAULT_AMQP_PROTOCOLS + "}", validValues = {"org.apache.qpid.server.model.port.AmqpPortImpl#getAllAvailableProtocolCombinations()"} )
     Set<Protocol> getProtocols();
 
+    @ManagedAttribute( defaultValue = "${" + PORT_MAX_OPEN_CONNECTIONS + "}" )
+    int getMaxOpenConnections();
+
+    @ManagedStatistic
+    int getConnectionCount();
+
     VirtualHostImpl getVirtualHost(String name);
 
+    boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);
+
+    int incrementConnectionCount();
+
+    int decrementConnectionCount();
+
     VirtualHostAlias createVirtualHostAlias(Map<String, Object> attributes);
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Fri Oct 17 19:41:37 2014
@@ -21,6 +21,7 @@ package org.apache.qpid.server.model.por
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.net.SocketAddress;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -30,6 +31,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.TreeSet;
 
 import javax.net.ssl.KeyManager;
@@ -42,6 +45,8 @@ import org.codehaus.jackson.map.ObjectMa
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.PortMessages;
+import org.apache.qpid.server.logging.subjects.PortLogSubject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.DefaultVirtualHostAlias;
 import org.apache.qpid.server.model.HostNameAlias;
@@ -105,6 +110,12 @@ public class AmqpPortImpl extends Abstra
     @ManagedAttributeField
     private String _bindingAddress;
 
+    @ManagedAttributeField
+    private int _maxOpenConnections;
+
+    private final AtomicInteger _connectionCount = new AtomicInteger();
+    private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
+
     private final Broker<?> _broker;
     private AcceptingTransport _transport;
 
@@ -141,6 +152,12 @@ public class AmqpPortImpl extends Abstra
     }
 
     @Override
+    public int getMaxOpenConnections()
+    {
+        return _maxOpenConnections;
+    }
+
+    @Override
     protected void onCreate()
     {
         super.onCreate();
@@ -452,4 +469,54 @@ public class AmqpPortImpl extends Abstra
             throw new ServerScopedRuntimeException(e);
         }
     }
+
+    @Override
+    public int incrementConnectionCount()
+    {
+        int openConnections = _connectionCount.incrementAndGet();
+        int maxOpenConnections = getMaxOpenConnections();
+        if(maxOpenConnections > 0
+           && openConnections > (maxOpenConnections * getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT)) / 100
+           && _connectionCountWarningGiven.compareAndSet(false, true))
+        {
+            _broker.getEventLogger().message(new PortLogSubject(this),
+                                             PortMessages.CONNECTION_COUNT_WARN(openConnections,
+                                                                                getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT),
+                                                                                maxOpenConnections));
+        }
+        return openConnections;
+    }
+
+    @Override
+    public int decrementConnectionCount()
+    {
+
+        int openConnections = _connectionCount.decrementAndGet();
+        int maxOpenConnections = getMaxOpenConnections();
+
+        if(maxOpenConnections > 0
+           && openConnections < (maxOpenConnections * square(getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT))) / 10000)
+        {
+           _connectionCountWarningGiven.compareAndSet(true,false);
+        }
+
+        return openConnections;
+    }
+
+    private static int square(int val)
+    {
+        return val * val;
+    }
+
+    @Override
+    public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
+    {
+        return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections;
+    }
+
+    @Override
+    public int getConnectionCount()
+    {
+        return _connectionCount.get();
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Oct 17 19:41:37 2014
@@ -33,12 +33,13 @@ import javax.net.ssl.SSLPeerUnverifiedEx
 import javax.security.auth.Subject;
 
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -55,24 +56,31 @@ public class MultiVersionProtocolEngine 
     private final SSLContext _sslContext;
     private final boolean _wantClientAuth;
     private final boolean _needClientAuth;
-    private final Port _port;
+    private final AmqpPort<?> _port;
     private final Transport _transport;
     private final ProtocolEngineCreator[] _creators;
+    private final Runnable _onCloseTask;
 
     private Set<Protocol> _supported;
     private String _fqdn;
-    private final Broker _broker;
+    private final Broker<?> _broker;
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
     private final Protocol _defaultSupportedReply;
 
     private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
 
-    public MultiVersionProtocolEngine(final Broker broker,
-                                      SSLContext sslContext, boolean wantClientAuth, boolean needClientAuth,
+    public MultiVersionProtocolEngine(final Broker<?> broker,
+                                      SSLContext sslContext,
+                                      boolean wantClientAuth,
+                                      boolean needClientAuth,
                                       final Set<Protocol> supported,
                                       final Protocol defaultSupportedReply,
-                                      Port port, Transport transport, final long id, ProtocolEngineCreator[] creators)
+                                      AmqpPort<?> port,
+                                      Transport transport,
+                                      final long id,
+                                      ProtocolEngineCreator[] creators,
+                                      final Runnable onCloseTask)
     {
         if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply))
         {
@@ -90,6 +98,7 @@ public class MultiVersionProtocolEngine 
         _port = port;
         _transport = transport;
         _creators = creators;
+        _onCloseTask = onCloseTask;
     }
 
 
@@ -115,7 +124,17 @@ public class MultiVersionProtocolEngine 
 
     public void closed()
     {
-        _delegate.closed();
+        try
+        {
+            _delegate.closed();
+        }
+        finally
+        {
+            if(_onCloseTask != null)
+            {
+                _onCloseTask.run();
+            }
+        }
     }
 
     public void writerIdle()
@@ -477,7 +496,8 @@ public class MultiVersionProtocolEngine 
         {
 
             _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
-                                                            _defaultSupportedReply, _port, Transport.SSL, _id, _creators);
+                                                            _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
+                                                            null);
 
             _engine = _sslContext.createSSLEngine();
             _engine.setUseClientMode(false);
@@ -485,11 +505,11 @@ public class MultiVersionProtocolEngine 
 
             if(_needClientAuth)
             {
-                _engine.setNeedClientAuth(_needClientAuth);
+                _engine.setNeedClientAuth(true);
             }
             else if(_wantClientAuth)
             {
-                _engine.setWantClientAuth(_wantClientAuth);
+                _engine.setWantClientAuth(true);
             }
 
             SSLStatus sslStatus = new SSLStatus();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Fri Oct 17 19:41:37 2014
@@ -20,6 +20,7 @@
 */
 package org.apache.qpid.server.protocol;
 
+import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -30,10 +31,12 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.logging.messages.PortMessages;
+import org.apache.qpid.server.logging.subjects.PortLogSubject;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
 import org.apache.qpid.server.plugin.ProtocolEngineCreatorComparator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
@@ -42,15 +45,17 @@ public class MultiVersionProtocolEngineF
 {
     private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
-    private final Broker _broker;
+    private final Broker<?> _broker;
     private final Set<Protocol> _supported;
     private final Protocol _defaultSupportedReply;
     private final SSLContext _sslContext;
     private final boolean _wantClientAuth;
     private final boolean _needClientAuth;
-    private final Port _port;
+    private final AmqpPort<?> _port;
     private final Transport _transport;
     private final ProtocolEngineCreator[] _creators;
+    private final ConnectionCountDecrementingTask
+            _connectionCountDecrementingTask = new ConnectionCountDecrementingTask();
 
     public MultiVersionProtocolEngineFactory(Broker<?> broker,
                                              SSLContext sslContext,
@@ -58,7 +63,7 @@ public class MultiVersionProtocolEngineF
                                              boolean needClientAuth,
                                              final Set<Protocol> supportedVersions,
                                              final Protocol defaultSupportedReply,
-                                             Port port,
+                                             AmqpPort<?> port,
                                              Transport transport)
     {
         if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply))
@@ -84,11 +89,31 @@ public class MultiVersionProtocolEngineF
         _transport = transport;
     }
 
-    public ServerProtocolEngine newProtocolEngine()
+    public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
     {
-        return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
-                                              _supported, _defaultSupportedReply, _port, _transport,
-                                              ID_GENERATOR.getAndIncrement(),
-                                              _creators);
+        if(_port.canAcceptNewConnection(remoteSocketAddress))
+        {
+            _port.incrementConnectionCount();
+            return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+                                                  _supported, _defaultSupportedReply, _port, _transport,
+                                                  ID_GENERATOR.getAndIncrement(),
+                                                  _creators, _connectionCountDecrementingTask);
+        }
+        else
+        {
+            _broker.getEventLogger().message(new PortLogSubject(_port),
+                                             PortMessages.CONNECTION_REJECTED(remoteSocketAddress.toString()));
+
+            return null;
+        }
+    }
+
+    private class ConnectionCountDecrementingTask implements Runnable
+    {
+        @Override
+        public void run()
+        {
+            _port.decrementConnectionCount();
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java Fri Oct 17 19:41:37 2014
@@ -19,7 +19,11 @@
 
 package org.apache.qpid.server.model.port;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -34,6 +38,8 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.AuthenticationProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
@@ -120,4 +126,41 @@ public class AmqpPortImplTest extends Qp
         serverSocket.bind(new InetSocketAddress(findFreePort()));
         return serverSocket;
     }
+
+    public void testConnectionCounting()
+    {
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(AmqpPort.PORT, 0);
+        attributes.put(AmqpPort.NAME, getTestName());
+        attributes.put(AmqpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
+        attributes.put(AmqpPort.MAX_OPEN_CONNECTIONS, 10);
+        attributes.put(AmqpPort.CONTEXT, Collections.singletonMap(AmqpPort.OPEN_CONNECTIONS_WARN_PERCENT, "80"));
+        _port = new AmqpPortImpl(attributes, _broker);
+        _port.create();
+        EventLogger mockLogger = mock(EventLogger.class);
+
+        when(_broker.getEventLogger()).thenReturn(mockLogger);
+
+        for(int i = 0; i < 8; i++)
+        {
+            assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+            _port.incrementConnectionCount();
+            assertEquals(i + 1, _port.getConnectionCount());
+            verify(mockLogger, never()).message(any(LogSubject.class), any(LogMessage.class));
+        }
+
+        assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+        _port.incrementConnectionCount();
+        assertEquals(9, _port.getConnectionCount());
+        verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class));
+
+        assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+        _port.incrementConnectionCount();
+        assertEquals(10, _port.getConnectionCount());
+        verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class));
+
+        assertFalse(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+
+
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Fri Oct 17 19:41:37 2014
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.transport;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.net.InetAddress;
+import java.net.SocketAddress;
 import java.security.KeyStore;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -106,6 +108,7 @@ public class TCPandSSLTransportTest exte
         when(port.getPort()).thenReturn(0);
         when(port.getSendBufferSize()).thenReturn(64*1024);
         when(port.getReceiveBufferSize()).thenReturn(64*1024);
+        when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
 
         TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
                                                               sslContext,
@@ -136,6 +139,8 @@ public class TCPandSSLTransportTest exte
         transport.close();
     }
 
+
+
     // self signed cert keystore valid until Oct 2024
     private static String keystoreString = "/u3+7QAAAAIAAAABAAAAAQAKc2VsZnNpZ25lZAAAAUkYmo+uAAAFATCCBP0wDgYKKwYBBAEqAhEB"
                                            + "AQUABIIE6bR+b7FHo2BRT/WG+zDIfO8zOXoGIbuNL2znNMnvEp9xwfMQOkhKxEbVtX8uJ7HSwi1V"

Modified: qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Fri Oct 17 19:41:37 2014
@@ -60,7 +60,7 @@ class WebSocketProvider implements Accep
     public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10";
     private final Transport _transport;
     private final SSLContext _sslContext;
-    private final Port<?> _port;
+    private final AmqpPort<?> _port;
     private final Set<Protocol> _supported;
     private final Protocol _defaultSupportedProtocolReply;
     private final ProtocolEngineFactory _factory;
@@ -68,7 +68,7 @@ class WebSocketProvider implements Accep
 
     WebSocketProvider(final Transport transport,
                       final SSLContext sslContext,
-                      final Port<?> port,
+                      final AmqpPort<?> port,
                       final Set<Protocol> supported,
                       final Protocol defaultSupportedProtocolReply)
     {
@@ -207,7 +207,7 @@ class WebSocketProvider implements Accep
         {
             _connection = connection;
 
-            _engine = _factory.newProtocolEngine();
+            _engine = _factory.newProtocolEngine(_remoteAddress);
 
             final ConnectionWrapper connectionWrapper =
                     new ConnectionWrapper(connection, _localAddress, _remoteAddress);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Fri Oct 17 19:41:37 2014
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.protocol;
 
+import java.net.SocketAddress;
+
 public interface ProtocolEngineFactory
 { 
  
   // Returns a new instance of a ProtocolEngine 
-  ProtocolEngine newProtocolEngine();
+  ProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress);
    
-} 
\ No newline at end of file
+} 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Oct 17 19:41:37 2014
@@ -243,28 +243,38 @@ public class IoNetworkTransport implemen
                     try
                     {
                         socket = _serverSocket.accept();
-                        socket.setTcpNoDelay(_config.getTcpNoDelay());
-                        socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
 
-                        final Integer sendBufferSize = _config.getSendBufferSize();
-                        final Integer receiveBufferSize = _config.getReceiveBufferSize();
+                        ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
 
-                        socket.setSendBufferSize(sendBufferSize);
-                        socket.setReceiveBufferSize(receiveBufferSize);
+                        if(engine != null)
+                        {
+                            socket.setTcpNoDelay(_config.getTcpNoDelay());
+                            socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+
+                            final Integer sendBufferSize = _config.getSendBufferSize();
+                            final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+                            socket.setSendBufferSize(sendBufferSize);
+                            socket.setReceiveBufferSize(receiveBufferSize);
 
-                        ProtocolEngine engine = _factory.newProtocolEngine();
 
-                        final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-                        NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
-                                                                               ticker);
+                            final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+                            NetworkConnection connection =
+                                    new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
+                                                            ticker);
 
-                        connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+                            connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
 
-                        ticker.setConnection(connection);
+                            ticker.setConnection(connection);
 
-                        engine.setNetworkConnection(connection, connection.getSender());
+                            engine.setNetworkConnection(connection, connection.getSender());
 
-                        connection.start();
+                            connection.start();
+                        }
+                        else
+                        {
+                            socket.close();
+                        }
                     }
                     catch(RuntimeException e)
                     {

Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java Fri Oct 17 19:41:37 2014
@@ -20,6 +20,7 @@
 */
 package org.apache.qpid.server.protocol;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -36,6 +37,7 @@ import org.apache.qpid.protocol.ServerPr
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -153,14 +155,16 @@ public class MultiVersionProtocolEngineF
     {
         Set<Protocol> protocols = getAllAMQPProtocols();
 
-        Port<?> port = mock(Port.class);
+        AmqpPort<?> port = mock(AmqpPort.class);
+        when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
+
         when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
         MultiVersionProtocolEngineFactory factory =
             new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port,
                     org.apache.qpid.server.model.Transport.TCP);
 
         //create a dummy to retrieve the 'current' ID number
-        long previousId = factory.newProtocolEngine().getConnectionId();
+        long previousId = factory.newProtocolEngine(mock(SocketAddress.class)).getConnectionId();
 
         //create a protocol engine and send the AMQP header for all supported AMQP verisons,
         //ensuring the ID assigned increases as expected
@@ -170,7 +174,7 @@ public class MultiVersionProtocolEngineF
             byte[] header = getAmqpHeader(protocol);
             assertNotNull("protocol header should not be null", header);
 
-            ServerProtocolEngine engine = factory.newProtocolEngine();
+            ServerProtocolEngine engine = factory.newProtocolEngine(null);
             TestNetworkConnection conn = new TestNetworkConnection();
             engine.setNetworkConnection(conn, conn.getSender());
             assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId());



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org