You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/17 21:41:38 UTC
svn commit: r1632655 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/logging/messages/
broker-core/src/main/java/org/apache/qpid/server/logging/subjects/
broker-core/src/main/java/org/apache/qpid/server/model/port/ broker-c...
Author: rgodfrey
Date: Fri Oct 17 19:41:37 2014
New Revision: 1632655
URL: http://svn.apache.org/r1632655
Log:
QPID-6165 : [Java Broker] Allow the number of open connections to be limited on a per port basis
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java (with props)
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
- copied, changed from r1632318, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java (with props)
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java?rev=1632655&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java Fri Oct 17 19:41:37 2014
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.messages;
+
+import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.logging.LogMessage;
+
+import java.text.MessageFormat;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+/**
+ * DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED.
+ *
+ * Generated using GenerateLogMessages and LogMessages.vm
+ * This file is based on the content of Port_logmessages.properties
+ *
+ * To regenerate, edit the templates/properties and run the build with -Dgenerate=true
+ */
+public class PortMessages
+{
+ private static ResourceBundle _messages;
+ private static Locale _currentLocale = BrokerProperties.getLocale();
+
+ public static final String PORT_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port";
+ public static final String OPEN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.open";
+ public static final String CREATE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.create";
+ public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.close";
+ public static final String CONNECTION_REJECTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected";
+ public static final String CONNECTION_COUNT_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_count_warn";
+
+ static
+ {
+ Logger.getLogger(PORT_LOG_HIERARCHY);
+ Logger.getLogger(OPEN_LOG_HIERARCHY);
+ Logger.getLogger(CREATE_LOG_HIERARCHY);
+ Logger.getLogger(CLOSE_LOG_HIERARCHY);
+ Logger.getLogger(CONNECTION_REJECTED_LOG_HIERARCHY);
+ Logger.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY);
+
+ _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Port_logmessages", _currentLocale);
+ }
+
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1002 : Open</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage OPEN()
+ {
+ String rawMessage = _messages.getString("OPEN");
+
+ final String message = rawMessage;
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return OPEN_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1001 : Create</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CREATE()
+ {
+ String rawMessage = _messages.getString("CREATE");
+
+ final String message = rawMessage;
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CREATE_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1003 : Close</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CLOSE()
+ {
+ String rawMessage = _messages.getString("CLOSE");
+
+ final String message = rawMessage;
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CLOSE_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1005 : Connection from {0} reject as connection limit reached</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CONNECTION_REJECTED(String param1)
+ {
+ String rawMessage = _messages.getString("CONNECTION_REJECTED");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CONNECTION_REJECTED_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1004 : Connection count {0,number} within {1, number} % of maximum {2,number}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CONNECTION_COUNT_WARN(Number param1, Number param2, Number param3)
+ {
+ String rawMessage = _messages.getString("CONNECTION_COUNT_WARN");
+
+ final Object[] messageArguments = {param1, param2, param3};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CONNECTION_COUNT_WARN_LOG_HIERARCHY;
+ }
+ };
+ }
+
+
+ private PortMessages()
+ {
+ }
+
+}
Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties (from r1632318, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties&r1=1632318&r2=1632655&rev=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties Fri Oct 17 19:41:37 2014
@@ -18,23 +18,9 @@
#
# Default File used for all non-defined locales.
-CREATE = CHN-1001 : Create
+CREATE = PRT-1001 : Create
+OPEN = PRT-1002 : Open
+CLOSE = PRT-1003 : Close
# 0 - flow
-FLOW = CHN-1002 : Flow {0}
-CLOSE = CHN-1003 : Close
-CLOSE_FORCED = CHN-1003 : Close : {0,number} - {1}
-
-# 0 - bytes allowed in prefetch
-# 1 - number of messagse.
-PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number}
-# 0 - queue causing flow control
-FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0})
-FLOW_REMOVED = CHN-1006 : Flow Control Removed
-# Channel Transactions
-# 0 - time in milliseconds
-OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
-IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
-
-DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
-DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
-DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
+CONNECTION_COUNT_WARN = PRT-1004 : Connection count {0,number} within {1, number} % of maximum {2,number}
+CONNECTION_REJECTED = PRT-1005 : Connection from {0} rejected
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java Fri Oct 17 19:41:37 2014
@@ -116,4 +116,11 @@ public class LogSubjectFormat
*/
public static final String QUEUE_FORMAT = "vh(/{0})/qu({1})";
+
+ /**
+ * LOG FORMAT for the Port LogSubject,
+ * 0 - Port number
+ */
+ public static final String PORT_FORMAT = "port({0})";
+
}
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java?rev=1632655&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java Fri Oct 17 19:41:37 2014
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.PORT_FORMAT;
+
+import org.apache.qpid.server.model.Port;
+
+public class PortLogSubject extends AbstractLogSubject
+{
+ public PortLogSubject(Port<?> port)
+ {
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}.
+ *
+ * Uses a MessageFormat call to insert the required values according to
+ * these indices:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ * 4 - Channel ID
+ */
+ setLogStringWithFormat(PORT_FORMAT, port.getPort());
+
+ }
+}
Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Fri Oct 17 19:41:37 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.model.port;
+import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
@@ -27,6 +28,7 @@ import org.apache.qpid.server.model.Auth
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedStatistic;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHostAlias;
@@ -41,14 +43,28 @@ public interface AmqpPort<X extends Amqp
String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false";
String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false";
+
String SEND_BUFFER_SIZE = "sendBufferSize";
String RECEIVE_BUFFER_SIZE = "receiveBufferSize";
+ String MAX_OPEN_CONNECTIONS = "maxOpenConnections";
+
String DEFAULT_AMQP_PROTOCOLS = "qpid.port.default_amqp_protocols";
@ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
+ String PORT_MAX_OPEN_CONNECTIONS = "qpid.port.max_open_connections";
+
+ @ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS)
+ int DEFAULT_MAX_OPEN_CONNECTIONS = -1;
+
+ String OPEN_CONNECTIONS_WARN_PERCENT = "qpid.port.open_connections_warn_percent";
+
+ @ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
+ int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
+
+
@ManagedAttribute(defaultValue = "*")
String getBindingAddress();
@@ -79,7 +95,19 @@ public interface AmqpPort<X extends Amqp
@ManagedAttribute( defaultValue = "${" + DEFAULT_AMQP_PROTOCOLS + "}", validValues = {"org.apache.qpid.server.model.port.AmqpPortImpl#getAllAvailableProtocolCombinations()"} )
Set<Protocol> getProtocols();
+ @ManagedAttribute( defaultValue = "${" + PORT_MAX_OPEN_CONNECTIONS + "}" )
+ int getMaxOpenConnections();
+
+ @ManagedStatistic
+ int getConnectionCount();
+
VirtualHostImpl getVirtualHost(String name);
+ boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);
+
+ int incrementConnectionCount();
+
+ int decrementConnectionCount();
+
VirtualHostAlias createVirtualHostAlias(Map<String, Object> attributes);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Fri Oct 17 19:41:37 2014
@@ -21,6 +21,7 @@ package org.apache.qpid.server.model.por
import java.io.IOException;
import java.io.StringWriter;
+import java.net.SocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
@@ -30,6 +31,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.TreeSet;
import javax.net.ssl.KeyManager;
@@ -42,6 +45,8 @@ import org.codehaus.jackson.map.ObjectMa
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.PortMessages;
+import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.DefaultVirtualHostAlias;
import org.apache.qpid.server.model.HostNameAlias;
@@ -105,6 +110,12 @@ public class AmqpPortImpl extends Abstra
@ManagedAttributeField
private String _bindingAddress;
+ @ManagedAttributeField
+ private int _maxOpenConnections;
+
+ private final AtomicInteger _connectionCount = new AtomicInteger();
+ private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
+
private final Broker<?> _broker;
private AcceptingTransport _transport;
@@ -141,6 +152,12 @@ public class AmqpPortImpl extends Abstra
}
@Override
+ public int getMaxOpenConnections()
+ {
+ return _maxOpenConnections;
+ }
+
+ @Override
protected void onCreate()
{
super.onCreate();
@@ -452,4 +469,54 @@ public class AmqpPortImpl extends Abstra
throw new ServerScopedRuntimeException(e);
}
}
+
+ @Override
+ public int incrementConnectionCount()
+ {
+ int openConnections = _connectionCount.incrementAndGet();
+ int maxOpenConnections = getMaxOpenConnections();
+ if(maxOpenConnections > 0
+ && openConnections > (maxOpenConnections * getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT)) / 100
+ && _connectionCountWarningGiven.compareAndSet(false, true))
+ {
+ _broker.getEventLogger().message(new PortLogSubject(this),
+ PortMessages.CONNECTION_COUNT_WARN(openConnections,
+ getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT),
+ maxOpenConnections));
+ }
+ return openConnections;
+ }
+
+ @Override
+ public int decrementConnectionCount()
+ {
+
+ int openConnections = _connectionCount.decrementAndGet();
+ int maxOpenConnections = getMaxOpenConnections();
+
+ if(maxOpenConnections > 0
+ && openConnections < (maxOpenConnections * square(getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT))) / 10000)
+ {
+ _connectionCountWarningGiven.compareAndSet(true,false);
+ }
+
+ return openConnections;
+ }
+
+ private static int square(int val)
+ {
+ return val * val;
+ }
+
+ @Override
+ public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
+ {
+ return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections;
+ }
+
+ @Override
+ public int getConnectionCount()
+ {
+ return _connectionCount.get();
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Oct 17 19:41:37 2014
@@ -33,12 +33,13 @@ import javax.net.ssl.SSLPeerUnverifiedEx
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
+
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -55,24 +56,31 @@ public class MultiVersionProtocolEngine
private final SSLContext _sslContext;
private final boolean _wantClientAuth;
private final boolean _needClientAuth;
- private final Port _port;
+ private final AmqpPort<?> _port;
private final Transport _transport;
private final ProtocolEngineCreator[] _creators;
+ private final Runnable _onCloseTask;
private Set<Protocol> _supported;
private String _fqdn;
- private final Broker _broker;
+ private final Broker<?> _broker;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
private final Protocol _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
- public MultiVersionProtocolEngine(final Broker broker,
- SSLContext sslContext, boolean wantClientAuth, boolean needClientAuth,
+ public MultiVersionProtocolEngine(final Broker<?> broker,
+ SSLContext sslContext,
+ boolean wantClientAuth,
+ boolean needClientAuth,
final Set<Protocol> supported,
final Protocol defaultSupportedReply,
- Port port, Transport transport, final long id, ProtocolEngineCreator[] creators)
+ AmqpPort<?> port,
+ Transport transport,
+ final long id,
+ ProtocolEngineCreator[] creators,
+ final Runnable onCloseTask)
{
if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply))
{
@@ -90,6 +98,7 @@ public class MultiVersionProtocolEngine
_port = port;
_transport = transport;
_creators = creators;
+ _onCloseTask = onCloseTask;
}
@@ -115,7 +124,17 @@ public class MultiVersionProtocolEngine
public void closed()
{
- _delegate.closed();
+ try
+ {
+ _delegate.closed();
+ }
+ finally
+ {
+ if(_onCloseTask != null)
+ {
+ _onCloseTask.run();
+ }
+ }
}
public void writerIdle()
@@ -477,7 +496,8 @@ public class MultiVersionProtocolEngine
{
_decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
- _defaultSupportedReply, _port, Transport.SSL, _id, _creators);
+ _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
+ null);
_engine = _sslContext.createSSLEngine();
_engine.setUseClientMode(false);
@@ -485,11 +505,11 @@ public class MultiVersionProtocolEngine
if(_needClientAuth)
{
- _engine.setNeedClientAuth(_needClientAuth);
+ _engine.setNeedClientAuth(true);
}
else if(_wantClientAuth)
{
- _engine.setWantClientAuth(_wantClientAuth);
+ _engine.setWantClientAuth(true);
}
SSLStatus sslStatus = new SSLStatus();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Fri Oct 17 19:41:37 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -30,10 +31,12 @@ import javax.net.ssl.SSLContext;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.logging.messages.PortMessages;
+import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.server.plugin.ProtocolEngineCreatorComparator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
@@ -42,15 +45,17 @@ public class MultiVersionProtocolEngineF
{
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
- private final Broker _broker;
+ private final Broker<?> _broker;
private final Set<Protocol> _supported;
private final Protocol _defaultSupportedReply;
private final SSLContext _sslContext;
private final boolean _wantClientAuth;
private final boolean _needClientAuth;
- private final Port _port;
+ private final AmqpPort<?> _port;
private final Transport _transport;
private final ProtocolEngineCreator[] _creators;
+ private final ConnectionCountDecrementingTask
+ _connectionCountDecrementingTask = new ConnectionCountDecrementingTask();
public MultiVersionProtocolEngineFactory(Broker<?> broker,
SSLContext sslContext,
@@ -58,7 +63,7 @@ public class MultiVersionProtocolEngineF
boolean needClientAuth,
final Set<Protocol> supportedVersions,
final Protocol defaultSupportedReply,
- Port port,
+ AmqpPort<?> port,
Transport transport)
{
if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply))
@@ -84,11 +89,31 @@ public class MultiVersionProtocolEngineF
_transport = transport;
}
- public ServerProtocolEngine newProtocolEngine()
+ public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
{
- return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
- _supported, _defaultSupportedReply, _port, _transport,
- ID_GENERATOR.getAndIncrement(),
- _creators);
+ if(_port.canAcceptNewConnection(remoteSocketAddress))
+ {
+ _port.incrementConnectionCount();
+ return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+ _supported, _defaultSupportedReply, _port, _transport,
+ ID_GENERATOR.getAndIncrement(),
+ _creators, _connectionCountDecrementingTask);
+ }
+ else
+ {
+ _broker.getEventLogger().message(new PortLogSubject(_port),
+ PortMessages.CONNECTION_REJECTED(remoteSocketAddress.toString()));
+
+ return null;
+ }
+ }
+
+ private class ConnectionCountDecrementingTask implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ _port.decrementConnectionCount();
+ }
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java Fri Oct 17 19:41:37 2014
@@ -19,7 +19,11 @@
package org.apache.qpid.server.model.port;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -34,6 +38,8 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -120,4 +126,41 @@ public class AmqpPortImplTest extends Qp
serverSocket.bind(new InetSocketAddress(findFreePort()));
return serverSocket;
}
+
+ public void testConnectionCounting()
+ {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(AmqpPort.PORT, 0);
+ attributes.put(AmqpPort.NAME, getTestName());
+ attributes.put(AmqpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
+ attributes.put(AmqpPort.MAX_OPEN_CONNECTIONS, 10);
+ attributes.put(AmqpPort.CONTEXT, Collections.singletonMap(AmqpPort.OPEN_CONNECTIONS_WARN_PERCENT, "80"));
+ _port = new AmqpPortImpl(attributes, _broker);
+ _port.create();
+ EventLogger mockLogger = mock(EventLogger.class);
+
+ when(_broker.getEventLogger()).thenReturn(mockLogger);
+
+ for(int i = 0; i < 8; i++)
+ {
+ assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+ _port.incrementConnectionCount();
+ assertEquals(i + 1, _port.getConnectionCount());
+ verify(mockLogger, never()).message(any(LogSubject.class), any(LogMessage.class));
+ }
+
+ assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+ _port.incrementConnectionCount();
+ assertEquals(9, _port.getConnectionCount());
+ verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class));
+
+ assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+ _port.incrementConnectionCount();
+ assertEquals(10, _port.getConnectionCount());
+ verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class));
+
+ assertFalse(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0)));
+
+
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Fri Oct 17 19:41:37 2014
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.transport;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.net.InetAddress;
+import java.net.SocketAddress;
import java.security.KeyStore;
import java.util.Arrays;
import java.util.HashSet;
@@ -106,6 +108,7 @@ public class TCPandSSLTransportTest exte
when(port.getPort()).thenReturn(0);
when(port.getSendBufferSize()).thenReturn(64*1024);
when(port.getReceiveBufferSize()).thenReturn(64*1024);
+ when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
sslContext,
@@ -136,6 +139,8 @@ public class TCPandSSLTransportTest exte
transport.close();
}
+
+
// self signed cert keystore valid until Oct 2024
private static String keystoreString = "/u3+7QAAAAIAAAABAAAAAQAKc2VsZnNpZ25lZAAAAUkYmo+uAAAFATCCBP0wDgYKKwYBBAEqAhEB"
+ "AQUABIIE6bR+b7FHo2BRT/WG+zDIfO8zOXoGIbuNL2znNMnvEp9xwfMQOkhKxEbVtX8uJ7HSwi1V"
Modified: qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Fri Oct 17 19:41:37 2014
@@ -60,7 +60,7 @@ class WebSocketProvider implements Accep
public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10";
private final Transport _transport;
private final SSLContext _sslContext;
- private final Port<?> _port;
+ private final AmqpPort<?> _port;
private final Set<Protocol> _supported;
private final Protocol _defaultSupportedProtocolReply;
private final ProtocolEngineFactory _factory;
@@ -68,7 +68,7 @@ class WebSocketProvider implements Accep
WebSocketProvider(final Transport transport,
final SSLContext sslContext,
- final Port<?> port,
+ final AmqpPort<?> port,
final Set<Protocol> supported,
final Protocol defaultSupportedProtocolReply)
{
@@ -207,7 +207,7 @@ class WebSocketProvider implements Accep
{
_connection = connection;
- _engine = _factory.newProtocolEngine();
+ _engine = _factory.newProtocolEngine(_remoteAddress);
final ConnectionWrapper connectionWrapper =
new ConnectionWrapper(connection, _localAddress, _remoteAddress);
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Fri Oct 17 19:41:37 2014
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.protocol;
+import java.net.SocketAddress;
+
public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine();
+ ProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress);
-}
\ No newline at end of file
+}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Oct 17 19:41:37 2014
@@ -243,28 +243,38 @@ public class IoNetworkTransport implemen
try
{
socket = _serverSocket.accept();
- socket.setTcpNoDelay(_config.getTcpNoDelay());
- socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
+ ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
+ if(engine != null)
+ {
+ socket.setTcpNoDelay(_config.getTcpNoDelay());
+ socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
- ProtocolEngine engine = _factory.newProtocolEngine();
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
- NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
- ticker);
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+ NetworkConnection connection =
+ new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
+ ticker);
- connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+ connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
- ticker.setConnection(connection);
+ ticker.setConnection(connection);
- engine.setNetworkConnection(connection, connection.getSender());
+ engine.setNetworkConnection(connection, connection.getSender());
- connection.start();
+ connection.start();
+ }
+ else
+ {
+ socket.close();
+ }
}
catch(RuntimeException e)
{
Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1632655&r1=1632654&r2=1632655&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java Fri Oct 17 19:41:37 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -36,6 +37,7 @@ import org.apache.qpid.protocol.ServerPr
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -153,14 +155,16 @@ public class MultiVersionProtocolEngineF
{
Set<Protocol> protocols = getAllAMQPProtocols();
- Port<?> port = mock(Port.class);
+ AmqpPort<?> port = mock(AmqpPort.class);
+ when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
+
when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
MultiVersionProtocolEngineFactory factory =
new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port,
org.apache.qpid.server.model.Transport.TCP);
//create a dummy to retrieve the 'current' ID number
- long previousId = factory.newProtocolEngine().getConnectionId();
+ long previousId = factory.newProtocolEngine(mock(SocketAddress.class)).getConnectionId();
//create a protocol engine and send the AMQP header for all supported AMQP verisons,
//ensuring the ID assigned increases as expected
@@ -170,7 +174,7 @@ public class MultiVersionProtocolEngineF
byte[] header = getAmqpHeader(protocol);
assertNotNull("protocol header should not be null", header);
- ServerProtocolEngine engine = factory.newProtocolEngine();
+ ServerProtocolEngine engine = factory.newProtocolEngine(null);
TestNetworkConnection conn = new TestNetworkConnection();
engine.setNetworkConnection(conn, conn.getSender());
assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org