You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/26 18:52:37 UTC

svn commit: r1710666 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/model/port/ broker-core/src/main/...

Author: lquack
Date: Mon Oct 26 17:52:37 2015
New Revision: 1710666

URL: http://svn.apache.org/viewvc?rev=1710666&view=rev
Log:
QPID-6807: [Java Broker] make number of selectors on AMQP ports and VirtualHosts configurable

Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Mon Oct 26 17:52:37 2015
@@ -243,6 +243,12 @@ public class BDBHAReplicaVirtualHostImpl
     }
 
     @Override
+    public int getNumberOfSelectors()
+    {
+        return 0;
+    }
+
+    @Override
     public long getQueueCount()
     {
         return 0;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Mon Oct 26 17:52:37 2015
@@ -59,6 +59,7 @@ public interface VirtualHost<X extends V
     String GLOBAL_ADDRESS_DOMAINS               = "globalAddressDomains";
     String VIRTUALHOST_WORK_DIR_VAR             = "virtualhost.work_dir";
     String VIRTUALHOST_WORK_DIR_VAR_EXPRESSION  = "${qpid.work_dir}${file.separator}${ancestor:virtualhost:name}";
+    String NUMBER_OF_SELECTORS                  = "numberOfSelectors";
     String CONNECTION_THREAD_POOL_SIZE          = "connectionThreadPoolSize";
     String CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "connectionThreadPoolKeepAliveTimeout";
 
@@ -119,6 +120,14 @@ public interface VirtualHost<X extends V
     @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE + "}")
     int getConnectionThreadPoolSize();
 
+    String VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS = "virtualhost.connectionThreadPool.numberOfSelectors";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS)
+    long DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS = Math.max(DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE/8, 1);
+
+    @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS + "}")
+    int getNumberOfSelectors();
+
     @ManagedContextDefault( name = "virtualhost.awaitAttainmentTimeout")
     public static final int DEFAULT_AWAIT_ATTAINMENT_TIMEOUT = 5000;
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Mon Oct 26 17:52:37 2015
@@ -49,11 +49,14 @@ public interface AmqpPort<X extends Amqp
 
     String MAX_OPEN_CONNECTIONS = "maxOpenConnections";
     String THREAD_POOL_SIZE = "threadPoolSize";
+    String NUMBER_OF_SELECTORS = "numberOfSelectors";
 
     String DEFAULT_AMQP_PROTOCOLS = "qpid.port.default_amqp_protocols";
 
-    String PORT_AMQP_THREAD_POOL_SIZE = "port.amqp.threadPool.size";
-    String PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "port.amqp.threadPool.keep_alive_timeout";
+    String PORT_AMQP_THREAD_POOL_SIZE = "qpid.port.amqp.threadPool.size";
+    String PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "qpid.port.amqp.threadPool.keep_alive_timeout";
+
+    String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
 
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
     String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
@@ -71,6 +74,10 @@ public interface AmqpPort<X extends Amqp
     @ManagedContextDefault(name = PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)
     long DEFAULT_PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = 60; // Minutes
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_AMQP_NUMBER_OF_SELECTORS)
+    long DEFAULT_PORT_AMQP_NUMBER_OF_SELECTORS = Math.max(DEFAULT_PORT_AMQP_THREAD_POOL_SIZE / 8, 1);
+
     String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
 
     @ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE)
@@ -93,6 +100,9 @@ public interface AmqpPort<X extends Amqp
     @ManagedAttribute( defaultValue = "${" + PORT_AMQP_THREAD_POOL_SIZE + "}")
     int getThreadPoolSize();
 
+    @ManagedAttribute( defaultValue = "${" + PORT_AMQP_NUMBER_OF_SELECTORS + "}")
+    int getNumberOfSelectors();
+
     @ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH )
     boolean getNeedClientAuth();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Mon Oct 26 17:52:37 2015
@@ -117,6 +117,9 @@ public class AmqpPortImpl extends Abstra
     @ManagedAttributeField
     private int _threadPoolSize;
 
+    @ManagedAttributeField
+    private int _numberOfSelectors;
+
     private final AtomicInteger _connectionCount = new AtomicInteger();
     private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
 
@@ -140,6 +143,13 @@ public class AmqpPortImpl extends Abstra
     }
 
     @Override
+    public int getNumberOfSelectors()
+    {
+        return _numberOfSelectors;
+    }
+
+
+    @Override
     public SSLContext getSSLContext()
     {
         return _sslContext;
@@ -322,7 +332,7 @@ public class AmqpPortImpl extends Abstra
     public void onValidate()
     {
         super.onValidate();
-        validateThreadPoolSize(this);
+        validateThreadPoolSettings(this);
     }
 
     @Override
@@ -330,17 +340,25 @@ public class AmqpPortImpl extends Abstra
     {
         super.validateChange(proxyForValidation, changedAttributes);
         AmqpPort changed = (AmqpPort) proxyForValidation;
-        if (changedAttributes.contains(THREAD_POOL_SIZE))
+        if (changedAttributes.contains(THREAD_POOL_SIZE) || changedAttributes.contains(NUMBER_OF_SELECTORS))
         {
-            validateThreadPoolSize(changed);
+            validateThreadPoolSettings(changed);
         }
     }
 
-    private void validateThreadPoolSize(final AmqpPort changed)
+    private void validateThreadPoolSettings(final AmqpPort changed)
     {
         if (changed.getThreadPoolSize() < 1)
         {
-            throw new IllegalConfigurationException(String.format("Thread pool size %d is too small. Must be greater than zero.", changed.getThreadPoolSize()));
+            throw new IllegalConfigurationException(String.format("Thread pool size %d on Port %s must be greater than zero.", changed.getThreadPoolSize(), getName()));
+        }
+        if (changed.getNumberOfSelectors() < 1)
+        {
+            throw new IllegalConfigurationException(String.format("Number of Selectors %d on Port %s must be greater than zero.", changed.getNumberOfSelectors(), getName()));
+        }
+        if (changed.getThreadPoolSize() <= changed.getNumberOfSelectors())
+        {
+            throw new IllegalConfigurationException(String.format("Number of Selectors %d on Port %s must be greater than the thread pool size %d.", changed.getNumberOfSelectors(), getName(), changed.getThreadPoolSize()));
         }
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Mon Oct 26 17:52:37 2015
@@ -43,13 +43,14 @@ public class NetworkConnectionScheduler
     private final int _poolSize;
     private final long _threadKeepAliveTimeout;
     private final String _name;
+    private final int _numberOfSelectors;
     private SelectorThread _selectorThread;
 
     public NetworkConnectionScheduler(final String name,
-                                      int threadPoolSize,
+                                      final int numberOfSelectors, int threadPoolSize,
                                       long threadKeepAliveTimeout)
     {
-        this(name, threadPoolSize, threadKeepAliveTimeout, new ThreadFactory()
+        this(name, numberOfSelectors, threadPoolSize, threadKeepAliveTimeout, new ThreadFactory()
                                     {
                                         final AtomicInteger _count = new AtomicInteger();
 
@@ -64,7 +65,7 @@ public class NetworkConnectionScheduler
     }
 
     public NetworkConnectionScheduler(String name,
-                                      int threadPoolSize,
+                                      final int numberOfSelectors, int threadPoolSize,
                                       long threadKeepAliveTimeout,
                                       ThreadFactory factory)
     {
@@ -72,6 +73,7 @@ public class NetworkConnectionScheduler
         _poolSize = threadPoolSize;
         _threadKeepAliveTimeout = threadKeepAliveTimeout;
         _factory = factory;
+        _numberOfSelectors = numberOfSelectors;
     }
 
 
@@ -79,7 +81,7 @@ public class NetworkConnectionScheduler
     {
         try
         {
-            _selectorThread = new SelectorThread(this);
+            _selectorThread = new SelectorThread(this, _numberOfSelectors);
             _executor = new ThreadPoolExecutor(_poolSize, _poolSize,
                                                _threadKeepAliveTimeout, TimeUnit.MINUTES,
                                                new LinkedBlockingQueue<Runnable>(), _factory);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Mon Oct 26 17:52:37 2015
@@ -334,12 +334,11 @@ class SelectorThread extends Thread
 
     private SelectionTask[] _selectionTasks;
 
-    SelectorThread(final NetworkConnectionScheduler scheduler) throws IOException
+    SelectorThread(final NetworkConnectionScheduler scheduler, final int numberOfSelectors) throws IOException
     {
         _scheduler = scheduler;
-        int selectors = Math.max(scheduler.getPoolSize()/8,1);
-        _selectionTasks = new SelectionTask[selectors];
-        for(int i = 0; i < selectors; i++)
+        _selectionTasks = new SelectionTask[numberOfSelectors];
+        for(int i = 0; i < numberOfSelectors; i++)
         {
             _selectionTasks[i] = new SelectionTask();
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Mon Oct 26 17:52:37 2015
@@ -72,7 +72,7 @@ class TCPandSSLTransport implements Acce
 
         long threadPoolKeepAliveTimeout = _port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT);
 
-        _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(),
+        _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(), _port.getNumberOfSelectors(),
                                                     _port.getThreadPoolSize(), threadPoolKeepAliveTimeout);
         _scheduler.start();
         _networkTransport = new NonBlockingNetworkTransport(protocolEngineFactory,

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Oct 26 17:52:37 2015
@@ -101,7 +101,6 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
         implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, EventListener
@@ -186,6 +185,9 @@ public abstract class AbstractVirtualHos
     private int _connectionThreadPoolSize;
 
     @ManagedAttributeField
+    private int _numberOfSelectors;
+
+    @ManagedAttributeField
     private List<String> _enabledConnectionValidators;
 
     @ManagedAttributeField
@@ -282,7 +284,7 @@ public abstract class AbstractVirtualHos
             }
         }
 
-        if (changedAttributes.contains(CONNECTION_THREAD_POOL_SIZE))
+        if (changedAttributes.contains(CONNECTION_THREAD_POOL_SIZE) || changedAttributes.contains(NUMBER_OF_SELECTORS))
         {
             validateConnectionThreadPoolSettings(virtualHost);
         }
@@ -314,7 +316,15 @@ public abstract class AbstractVirtualHos
     {
         if (virtualHost.getConnectionThreadPoolSize() < 1)
         {
-            throw new IllegalConfigurationException(String.format("Thread pool size %d is too small. Must be greater than zero.", virtualHost.getConnectionThreadPoolSize()));
+            throw new IllegalConfigurationException(String.format("Thread pool size %d on VirtualHost %s must be greater than zero.", virtualHost.getConnectionThreadPoolSize(), getName()));
+        }
+        if (virtualHost.getNumberOfSelectors() < 1)
+        {
+            throw new IllegalConfigurationException(String.format("Number of Selectors %d on VirtualHost %s must be greater than zero.", virtualHost.getNumberOfSelectors(), getName()));
+        }
+        if (virtualHost.getConnectionThreadPoolSize() <= virtualHost.getNumberOfSelectors())
+        {
+            throw new IllegalConfigurationException(String.format("Number of Selectors %d on VirtualHost %s must be greater than the connection pool size %d.", virtualHost.getNumberOfSelectors(), getName(), virtualHost.getConnectionThreadPoolSize()));
         }
     }
 
@@ -1481,6 +1491,12 @@ public abstract class AbstractVirtualHos
         return _connectionThreadPoolSize;
     }
 
+    @Override
+    public int getNumberOfSelectors()
+    {
+        return _numberOfSelectors;
+    }
+
     @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
     protected ListenableFuture<Void> doStop()
     {
@@ -1833,7 +1849,7 @@ public abstract class AbstractVirtualHos
         return total;
     }
 
-    @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE )
+    @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
     private ListenableFuture<Void> onActivate()
     {
         final SuppressingInheritedAccessControlContextThreadFactory housekeepingThreadFactory =
@@ -1896,6 +1912,7 @@ public abstract class AbstractVirtualHos
                                                                           SecurityManager.getSystemTaskSubject("IO Pool", getPrincipal()));
 
         _networkConnectionScheduler = new NetworkConnectionScheduler("virtualhost-" + getName() + "-iopool",
+                                                                     getNumberOfSelectors(),
                                                                      getConnectionThreadPoolSize(),
                                                                      threadPoolKeepAliveTimeout,
                                                                      connectionThreadFactory);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Mon Oct 26 17:52:37 2015
@@ -243,6 +243,12 @@ class RedirectingVirtualHostImpl
     }
 
     @Override
+    public int getNumberOfSelectors()
+    {
+        return 0;
+    }
+
+    @Override
     public long getQueueCount()
     {
         return 0;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Mon Oct 26 17:52:37 2015
@@ -50,6 +50,7 @@ import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -261,15 +262,17 @@ public class VirtualHostTest extends Qpi
 
         AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost);
         assertEquals("Unexpected number of connections before connection registered",
-                0,
-                virtualHost.getConnectionCount());
+                     0,
+                     virtualHost.getConnectionCount());
 
         AMQPConnection modelConnection = mock(AMQPConnection.class);
         when(modelConnection.getUnderlyingConnection()).thenReturn(connection);
         when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
         virtualHost.registerConnection(modelConnection);
 
-        assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getConnectionCount());
+        assertEquals("Unexpected number of connections after connection registered",
+                     1,
+                     virtualHost.getConnectionCount());
 
         virtualHost.delete();
         assertEquals("Unexpected state", State.DELETED, virtualHost.getState());
@@ -295,7 +298,7 @@ public class VirtualHostTest extends Qpi
         assertNotNull(queue.getId());
         assertEquals(queueName, queue.getName());
 
-        verify(_configStore).update(eq(true),matchesRecord(queue.getId(), queue.getType()));
+        verify(_configStore).update(eq(true), matchesRecord(queue.getId(), queue.getType()));
     }
 
     public void testCreateNonDurableQueue()
@@ -324,7 +327,8 @@ public class VirtualHostTest extends Qpi
         String virtualHostName = getName();
         VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
 
-        doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(virtualHost);
+        doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(
+                virtualHost);
 
         assertNull(virtualHost.getDescription());
 
@@ -348,7 +352,8 @@ public class VirtualHostTest extends Qpi
         String virtualHostName = getName();
         VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
 
-        doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(virtualHost);
+        doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(
+                virtualHost);
 
         try
         {
@@ -370,7 +375,8 @@ public class VirtualHostTest extends Qpi
         String virtualHostName = getName();
         VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
 
-        doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseDelete(virtualHost);
+        doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseDelete(
+                virtualHost);
 
         try
         {
@@ -395,13 +401,83 @@ public class VirtualHostTest extends Qpi
         verify(connection).block();
     }
 
+    public void testCreateValidation() throws Exception
+    {
+        try
+        {
+            createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, "-1"));
+            fail("Exception not thrown for negative number of selectors");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+        try
+        {
+            createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.CONNECTION_THREAD_POOL_SIZE, "-1"));
+            fail("Exception not thrown for negative connection thread pool size");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+        try
+        {
+            createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, VirtualHost.DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE));
+            fail("Exception not thrown for number of selectors equal to connection thread pool size");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+    }
+
+    public void testChangeValidation() throws Exception
+    {
+        VirtualHost<?,?,?> virtualHost = createVirtualHost(getTestName());
+        try
+        {
+            virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, "-1"));
+            fail("Exception not thrown for negative number of selectors");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+        try
+        {
+            virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.CONNECTION_THREAD_POOL_SIZE,
+                                                                               "-1"));
+            fail("Exception not thrown for negative connection thread pool size");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+        try
+        {
+            virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, VirtualHost.DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE));
+            fail("Exception not thrown for number of selectors equal to connection thread pool size");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+    }
+
     private VirtualHost<?,?,?> createVirtualHost(final String virtualHostName)
     {
-        Map<String, Object> attributes = new HashMap<>();
-        attributes.put(VirtualHost.NAME, virtualHostName);
-        attributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+        return createVirtualHost(virtualHostName, Collections.<String, Object>emptyMap());
+    }
+
+    private VirtualHost<?,?,?> createVirtualHost(final String virtualHostName, Map<String,Object> attributes)
+    {
+        Map<String, Object> vhAttributes = new HashMap<>();
+        vhAttributes.put(VirtualHost.NAME, virtualHostName);
+        vhAttributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+        vhAttributes.putAll(attributes);
 
-        TestMemoryVirtualHost host = new TestMemoryVirtualHost(attributes, _virtualHostNode);
+        TestMemoryVirtualHost host = new TestMemoryVirtualHost(vhAttributes, _virtualHostNode);
         host.addChangeListener(_storeConfigurationChangeListener);
         host.create();
         // Fire the child added event on the node

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java Mon Oct 26 17:52:37 2015
@@ -45,6 +45,7 @@ import org.apache.qpid.server.model.Auth
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -53,7 +54,6 @@ public class AmqpPortImplTest extends Qp
     private static final String AUTHENTICATION_PROVIDER_NAME = "test";
     private TaskExecutor _taskExecutor;
     private Broker _broker;
-    private ServerSocket _socket;
     private AmqpPortImpl _port;
 
     @Override
@@ -84,10 +84,6 @@ public class AmqpPortImplTest extends Qp
     {
         try
         {
-            if (_socket != null)
-            {
-                _socket.close();
-            }
             _taskExecutor.stop();
         }
         finally
@@ -104,28 +100,106 @@ public class AmqpPortImplTest extends Qp
         }
     }
 
-    public void testValidateOnCreate() throws Exception
+    public void testOnCreateValidation() throws Exception
     {
-        _socket = openSocket();
-
-        Map<String, Object> attributes = new HashMap<>();
-        attributes.put(AmqpPort.PORT, _socket.getLocalPort());
-        attributes.put(AmqpPort.NAME, getTestName());
-        attributes.put(AmqpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
-        _port = new AmqpPortImpl(attributes, _broker);
+        ServerSocket socket = openSocket();
         try
         {
-            _port.create();
+            createPort(getTestName(), Collections.<String,Object>singletonMap(AmqpPort.PORT, socket.getLocalPort()));
             fail("Creation should fail due to validation check");
         }
         catch (IllegalConfigurationException e)
         {
             assertEquals("Unexpected exception message",
-                    String.format("Cannot bind to port %d and binding address '%s'. Port is already is use.",
-                        _socket.getLocalPort(), "*"), e.getMessage());
+                         String.format("Cannot bind to port %d and binding address '%s'. Port is already is use.",
+                                       socket.getLocalPort(), "*"), e.getMessage());
+        }
+        finally
+        {
+            socket.close();
+        }
+
+        try
+        {
+            createPort(getTestName(), Collections.<String, Object>singletonMap(AmqpPort.NUMBER_OF_SELECTORS, "-1"));
+            fail("Exception not thrown for negative number of selectors");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+        try
+        {
+            createPort(getTestName(), Collections.<String, Object>singletonMap(AmqpPort.THREAD_POOL_SIZE, "-1"));
+            fail("Exception not thrown for negative thread pool size");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+        try
+        {
+            createPort(getTestName(),
+                       Collections.<String, Object>singletonMap(AmqpPort.NUMBER_OF_SELECTORS,
+                                                                AmqpPort.DEFAULT_PORT_AMQP_THREAD_POOL_SIZE));
+            fail("Exception not thrown for number of selectors equal to thread pool size");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+    }
+
+    public void testOnChangeThreadPoolValidation() throws Exception
+    {
+        _port = createPort(getTestName());
+        try
+        {
+            _port.setAttributes(Collections.<String, Object>singletonMap(AmqpPort.NUMBER_OF_SELECTORS, "-1"));
+            fail("Exception not thrown for negative number of selectors");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+        try
+        {
+            _port.setAttributes(Collections.<String, Object>singletonMap(AmqpPort.THREAD_POOL_SIZE, "-1"));
+            fail("Exception not thrown for negative thread pool size");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
+        }
+        try
+        {
+            _port.setAttributes(Collections.<String, Object>singletonMap(AmqpPort.NUMBER_OF_SELECTORS,
+                                                                         AmqpPort.DEFAULT_PORT_AMQP_THREAD_POOL_SIZE));
+            fail("Exception not thrown for number of selectors equal to thread pool size");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // pass
         }
     }
 
+    private AmqpPortImpl createPort(final String portName)
+    {
+        return createPort(portName, Collections.<String, Object>emptyMap());
+    }
+
+    private AmqpPortImpl createPort(final String portName, final Map<String, Object> attributes)
+    {
+        Map<String, Object> portAttributes = new HashMap<>();
+        portAttributes.put(AmqpPort.PORT, 0);
+        portAttributes.put(AmqpPort.NAME, portName);
+        portAttributes.put(AmqpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
+        portAttributes.putAll(attributes);
+        AmqpPortImpl port = new AmqpPortImpl(portAttributes, _broker);
+        port.create();
+        return port;
+    }
+
     private ServerSocket openSocket() throws IOException
     {
         ServerSocket serverSocket = new ServerSocket();

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Mon Oct 26 17:52:37 2015
@@ -110,6 +110,7 @@ public class TCPandSSLTransportTest exte
         when(port.getNetworkBufferSize()).thenReturn(64*1024);
         when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
         when(port.getThreadPoolSize()).thenReturn(2);
+        when(port.getNumberOfSelectors()).thenReturn(1);
         when(port.getSSLContext()).thenReturn(sslContext);
         when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org