You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/26 18:52:37 UTC
svn commit: r1710666 - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/model/port/
broker-core/src/main/...
Author: lquack
Date: Mon Oct 26 17:52:37 2015
New Revision: 1710666
URL: http://svn.apache.org/viewvc?rev=1710666&view=rev
Log:
QPID-6807: [Java Broker] make number of selectors on AMQP ports and VirtualHosts configurable
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Mon Oct 26 17:52:37 2015
@@ -243,6 +243,12 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
+ public int getNumberOfSelectors()
+ {
+ return 0;
+ }
+
+ @Override
public long getQueueCount()
{
return 0;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Mon Oct 26 17:52:37 2015
@@ -59,6 +59,7 @@ public interface VirtualHost<X extends V
String GLOBAL_ADDRESS_DOMAINS = "globalAddressDomains";
String VIRTUALHOST_WORK_DIR_VAR = "virtualhost.work_dir";
String VIRTUALHOST_WORK_DIR_VAR_EXPRESSION = "${qpid.work_dir}${file.separator}${ancestor:virtualhost:name}";
+ String NUMBER_OF_SELECTORS = "numberOfSelectors";
String CONNECTION_THREAD_POOL_SIZE = "connectionThreadPoolSize";
String CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "connectionThreadPoolKeepAliveTimeout";
@@ -119,6 +120,14 @@ public interface VirtualHost<X extends V
@ManagedAttribute( defaultValue = "${" + VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE + "}")
int getConnectionThreadPoolSize();
+ String VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS = "virtualhost.connectionThreadPool.numberOfSelectors";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS)
+ long DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS = Math.max(DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE/8, 1);
+
+ @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS + "}")
+ int getNumberOfSelectors();
+
@ManagedContextDefault( name = "virtualhost.awaitAttainmentTimeout")
public static final int DEFAULT_AWAIT_ATTAINMENT_TIMEOUT = 5000;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Mon Oct 26 17:52:37 2015
@@ -49,11 +49,14 @@ public interface AmqpPort<X extends Amqp
String MAX_OPEN_CONNECTIONS = "maxOpenConnections";
String THREAD_POOL_SIZE = "threadPoolSize";
+ String NUMBER_OF_SELECTORS = "numberOfSelectors";
String DEFAULT_AMQP_PROTOCOLS = "qpid.port.default_amqp_protocols";
- String PORT_AMQP_THREAD_POOL_SIZE = "port.amqp.threadPool.size";
- String PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "port.amqp.threadPool.keep_alive_timeout";
+ String PORT_AMQP_THREAD_POOL_SIZE = "qpid.port.amqp.threadPool.size";
+ String PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "qpid.port.amqp.threadPool.keep_alive_timeout";
+
+ String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
@ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
@@ -71,6 +74,10 @@ public interface AmqpPort<X extends Amqp
@ManagedContextDefault(name = PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)
long DEFAULT_PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = 60; // Minutes
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = PORT_AMQP_NUMBER_OF_SELECTORS)
+ long DEFAULT_PORT_AMQP_NUMBER_OF_SELECTORS = Math.max(DEFAULT_PORT_AMQP_THREAD_POOL_SIZE / 8, 1);
+
String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
@ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE)
@@ -93,6 +100,9 @@ public interface AmqpPort<X extends Amqp
@ManagedAttribute( defaultValue = "${" + PORT_AMQP_THREAD_POOL_SIZE + "}")
int getThreadPoolSize();
+ @ManagedAttribute( defaultValue = "${" + PORT_AMQP_NUMBER_OF_SELECTORS + "}")
+ int getNumberOfSelectors();
+
@ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH )
boolean getNeedClientAuth();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Mon Oct 26 17:52:37 2015
@@ -117,6 +117,9 @@ public class AmqpPortImpl extends Abstra
@ManagedAttributeField
private int _threadPoolSize;
+ @ManagedAttributeField
+ private int _numberOfSelectors;
+
private final AtomicInteger _connectionCount = new AtomicInteger();
private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
@@ -140,6 +143,13 @@ public class AmqpPortImpl extends Abstra
}
@Override
+ public int getNumberOfSelectors()
+ {
+ return _numberOfSelectors;
+ }
+
+
+ @Override
public SSLContext getSSLContext()
{
return _sslContext;
@@ -322,7 +332,7 @@ public class AmqpPortImpl extends Abstra
public void onValidate()
{
super.onValidate();
- validateThreadPoolSize(this);
+ validateThreadPoolSettings(this);
}
@Override
@@ -330,17 +340,25 @@ public class AmqpPortImpl extends Abstra
{
super.validateChange(proxyForValidation, changedAttributes);
AmqpPort changed = (AmqpPort) proxyForValidation;
- if (changedAttributes.contains(THREAD_POOL_SIZE))
+ if (changedAttributes.contains(THREAD_POOL_SIZE) || changedAttributes.contains(NUMBER_OF_SELECTORS))
{
- validateThreadPoolSize(changed);
+ validateThreadPoolSettings(changed);
}
}
- private void validateThreadPoolSize(final AmqpPort changed)
+ private void validateThreadPoolSettings(final AmqpPort changed)
{
if (changed.getThreadPoolSize() < 1)
{
- throw new IllegalConfigurationException(String.format("Thread pool size %d is too small. Must be greater than zero.", changed.getThreadPoolSize()));
+ throw new IllegalConfigurationException(String.format("Thread pool size %d on Port %s must be greater than zero.", changed.getThreadPoolSize(), getName()));
+ }
+ if (changed.getNumberOfSelectors() < 1)
+ {
+ throw new IllegalConfigurationException(String.format("Number of Selectors %d on Port %s must be greater than zero.", changed.getNumberOfSelectors(), getName()));
+ }
+ if (changed.getThreadPoolSize() <= changed.getNumberOfSelectors())
+ {
+ throw new IllegalConfigurationException(String.format("Number of Selectors %d on Port %s must be greater than the thread pool size %d.", changed.getNumberOfSelectors(), getName(), changed.getThreadPoolSize()));
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Mon Oct 26 17:52:37 2015
@@ -43,13 +43,14 @@ public class NetworkConnectionScheduler
private final int _poolSize;
private final long _threadKeepAliveTimeout;
private final String _name;
+ private final int _numberOfSelectors;
private SelectorThread _selectorThread;
public NetworkConnectionScheduler(final String name,
- int threadPoolSize,
+ final int numberOfSelectors, int threadPoolSize,
long threadKeepAliveTimeout)
{
- this(name, threadPoolSize, threadKeepAliveTimeout, new ThreadFactory()
+ this(name, numberOfSelectors, threadPoolSize, threadKeepAliveTimeout, new ThreadFactory()
{
final AtomicInteger _count = new AtomicInteger();
@@ -64,7 +65,7 @@ public class NetworkConnectionScheduler
}
public NetworkConnectionScheduler(String name,
- int threadPoolSize,
+ final int numberOfSelectors, int threadPoolSize,
long threadKeepAliveTimeout,
ThreadFactory factory)
{
@@ -72,6 +73,7 @@ public class NetworkConnectionScheduler
_poolSize = threadPoolSize;
_threadKeepAliveTimeout = threadKeepAliveTimeout;
_factory = factory;
+ _numberOfSelectors = numberOfSelectors;
}
@@ -79,7 +81,7 @@ public class NetworkConnectionScheduler
{
try
{
- _selectorThread = new SelectorThread(this);
+ _selectorThread = new SelectorThread(this, _numberOfSelectors);
_executor = new ThreadPoolExecutor(_poolSize, _poolSize,
_threadKeepAliveTimeout, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(), _factory);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Mon Oct 26 17:52:37 2015
@@ -334,12 +334,11 @@ class SelectorThread extends Thread
private SelectionTask[] _selectionTasks;
- SelectorThread(final NetworkConnectionScheduler scheduler) throws IOException
+ SelectorThread(final NetworkConnectionScheduler scheduler, final int numberOfSelectors) throws IOException
{
_scheduler = scheduler;
- int selectors = Math.max(scheduler.getPoolSize()/8,1);
- _selectionTasks = new SelectionTask[selectors];
- for(int i = 0; i < selectors; i++)
+ _selectionTasks = new SelectionTask[numberOfSelectors];
+ for(int i = 0; i < numberOfSelectors; i++)
{
_selectionTasks[i] = new SelectionTask();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Mon Oct 26 17:52:37 2015
@@ -72,7 +72,7 @@ class TCPandSSLTransport implements Acce
long threadPoolKeepAliveTimeout = _port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT);
- _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(),
+ _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(), _port.getNumberOfSelectors(),
_port.getThreadPoolSize(), threadPoolKeepAliveTimeout);
_scheduler.start();
_networkTransport = new NonBlockingNetworkTransport(protocolEngineFactory,
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Oct 26 17:52:37 2015
@@ -101,7 +101,6 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, EventListener
@@ -186,6 +185,9 @@ public abstract class AbstractVirtualHos
private int _connectionThreadPoolSize;
@ManagedAttributeField
+ private int _numberOfSelectors;
+
+ @ManagedAttributeField
private List<String> _enabledConnectionValidators;
@ManagedAttributeField
@@ -282,7 +284,7 @@ public abstract class AbstractVirtualHos
}
}
- if (changedAttributes.contains(CONNECTION_THREAD_POOL_SIZE))
+ if (changedAttributes.contains(CONNECTION_THREAD_POOL_SIZE) || changedAttributes.contains(NUMBER_OF_SELECTORS))
{
validateConnectionThreadPoolSettings(virtualHost);
}
@@ -314,7 +316,15 @@ public abstract class AbstractVirtualHos
{
if (virtualHost.getConnectionThreadPoolSize() < 1)
{
- throw new IllegalConfigurationException(String.format("Thread pool size %d is too small. Must be greater than zero.", virtualHost.getConnectionThreadPoolSize()));
+ throw new IllegalConfigurationException(String.format("Thread pool size %d on VirtualHost %s must be greater than zero.", virtualHost.getConnectionThreadPoolSize(), getName()));
+ }
+ if (virtualHost.getNumberOfSelectors() < 1)
+ {
+ throw new IllegalConfigurationException(String.format("Number of Selectors %d on VirtualHost %s must be greater than zero.", virtualHost.getNumberOfSelectors(), getName()));
+ }
+ if (virtualHost.getConnectionThreadPoolSize() <= virtualHost.getNumberOfSelectors())
+ {
+ throw new IllegalConfigurationException(String.format("Number of Selectors %d on VirtualHost %s must be greater than the connection pool size %d.", virtualHost.getNumberOfSelectors(), getName(), virtualHost.getConnectionThreadPoolSize()));
}
}
@@ -1481,6 +1491,12 @@ public abstract class AbstractVirtualHos
return _connectionThreadPoolSize;
}
+ @Override
+ public int getNumberOfSelectors()
+ {
+ return _numberOfSelectors;
+ }
+
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
protected ListenableFuture<Void> doStop()
{
@@ -1833,7 +1849,7 @@ public abstract class AbstractVirtualHos
return total;
}
- @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE )
+ @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
private ListenableFuture<Void> onActivate()
{
final SuppressingInheritedAccessControlContextThreadFactory housekeepingThreadFactory =
@@ -1896,6 +1912,7 @@ public abstract class AbstractVirtualHos
SecurityManager.getSystemTaskSubject("IO Pool", getPrincipal()));
_networkConnectionScheduler = new NetworkConnectionScheduler("virtualhost-" + getName() + "-iopool",
+ getNumberOfSelectors(),
getConnectionThreadPoolSize(),
threadPoolKeepAliveTimeout,
connectionThreadFactory);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Mon Oct 26 17:52:37 2015
@@ -243,6 +243,12 @@ class RedirectingVirtualHostImpl
}
@Override
+ public int getNumberOfSelectors()
+ {
+ return 0;
+ }
+
+ @Override
public long getQueueCount()
{
return 0;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Mon Oct 26 17:52:37 2015
@@ -50,6 +50,7 @@ import org.mockito.stubbing.Answer;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -261,15 +262,17 @@ public class VirtualHostTest extends Qpi
AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost);
assertEquals("Unexpected number of connections before connection registered",
- 0,
- virtualHost.getConnectionCount());
+ 0,
+ virtualHost.getConnectionCount());
AMQPConnection modelConnection = mock(AMQPConnection.class);
when(modelConnection.getUnderlyingConnection()).thenReturn(connection);
when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
virtualHost.registerConnection(modelConnection);
- assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getConnectionCount());
+ assertEquals("Unexpected number of connections after connection registered",
+ 1,
+ virtualHost.getConnectionCount());
virtualHost.delete();
assertEquals("Unexpected state", State.DELETED, virtualHost.getState());
@@ -295,7 +298,7 @@ public class VirtualHostTest extends Qpi
assertNotNull(queue.getId());
assertEquals(queueName, queue.getName());
- verify(_configStore).update(eq(true),matchesRecord(queue.getId(), queue.getType()));
+ verify(_configStore).update(eq(true), matchesRecord(queue.getId(), queue.getType()));
}
public void testCreateNonDurableQueue()
@@ -324,7 +327,8 @@ public class VirtualHostTest extends Qpi
String virtualHostName = getName();
VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
- doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(virtualHost);
+ doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(
+ virtualHost);
assertNull(virtualHost.getDescription());
@@ -348,7 +352,8 @@ public class VirtualHostTest extends Qpi
String virtualHostName = getName();
VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
- doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(virtualHost);
+ doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseUpdate(
+ virtualHost);
try
{
@@ -370,7 +375,8 @@ public class VirtualHostTest extends Qpi
String virtualHostName = getName();
VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
- doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseDelete(virtualHost);
+ doThrow(new AccessControlException("mocked ACL exception")).when(_mockSecurityManager).authoriseDelete(
+ virtualHost);
try
{
@@ -395,13 +401,83 @@ public class VirtualHostTest extends Qpi
verify(connection).block();
}
+ public void testCreateValidation() throws Exception
+ {
+ try
+ {
+ createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, "-1"));
+ fail("Exception not thrown for negative number of selectors");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ try
+ {
+ createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.CONNECTION_THREAD_POOL_SIZE, "-1"));
+ fail("Exception not thrown for negative connection thread pool size");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ try
+ {
+ createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, VirtualHost.DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE));
+ fail("Exception not thrown for number of selectors equal to connection thread pool size");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testChangeValidation() throws Exception
+ {
+ VirtualHost<?,?,?> virtualHost = createVirtualHost(getTestName());
+ try
+ {
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, "-1"));
+ fail("Exception not thrown for negative number of selectors");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ try
+ {
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.CONNECTION_THREAD_POOL_SIZE,
+ "-1"));
+ fail("Exception not thrown for negative connection thread pool size");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ try
+ {
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, VirtualHost.DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE));
+ fail("Exception not thrown for number of selectors equal to connection thread pool size");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
private VirtualHost<?,?,?> createVirtualHost(final String virtualHostName)
{
- Map<String, Object> attributes = new HashMap<>();
- attributes.put(VirtualHost.NAME, virtualHostName);
- attributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+ return createVirtualHost(virtualHostName, Collections.<String, Object>emptyMap());
+ }
+
+ private VirtualHost<?,?,?> createVirtualHost(final String virtualHostName, Map<String,Object> attributes)
+ {
+ Map<String, Object> vhAttributes = new HashMap<>();
+ vhAttributes.put(VirtualHost.NAME, virtualHostName);
+ vhAttributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+ vhAttributes.putAll(attributes);
- TestMemoryVirtualHost host = new TestMemoryVirtualHost(attributes, _virtualHostNode);
+ TestMemoryVirtualHost host = new TestMemoryVirtualHost(vhAttributes, _virtualHostNode);
host.addChangeListener(_storeConfigurationChangeListener);
host.create();
// Fire the child added event on the node
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java Mon Oct 26 17:52:37 2015
@@ -45,6 +45,7 @@ import org.apache.qpid.server.model.Auth
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -53,7 +54,6 @@ public class AmqpPortImplTest extends Qp
private static final String AUTHENTICATION_PROVIDER_NAME = "test";
private TaskExecutor _taskExecutor;
private Broker _broker;
- private ServerSocket _socket;
private AmqpPortImpl _port;
@Override
@@ -84,10 +84,6 @@ public class AmqpPortImplTest extends Qp
{
try
{
- if (_socket != null)
- {
- _socket.close();
- }
_taskExecutor.stop();
}
finally
@@ -104,28 +100,106 @@ public class AmqpPortImplTest extends Qp
}
}
- public void testValidateOnCreate() throws Exception
+ public void testOnCreateValidation() throws Exception
{
- _socket = openSocket();
-
- Map<String, Object> attributes = new HashMap<>();
- attributes.put(AmqpPort.PORT, _socket.getLocalPort());
- attributes.put(AmqpPort.NAME, getTestName());
- attributes.put(AmqpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
- _port = new AmqpPortImpl(attributes, _broker);
+ ServerSocket socket = openSocket();
try
{
- _port.create();
+ createPort(getTestName(), Collections.<String,Object>singletonMap(AmqpPort.PORT, socket.getLocalPort()));
fail("Creation should fail due to validation check");
}
catch (IllegalConfigurationException e)
{
assertEquals("Unexpected exception message",
- String.format("Cannot bind to port %d and binding address '%s'. Port is already is use.",
- _socket.getLocalPort(), "*"), e.getMessage());
+ String.format("Cannot bind to port %d and binding address '%s'. Port is already is use.",
+ socket.getLocalPort(), "*"), e.getMessage());
+ }
+ finally
+ {
+ socket.close();
+ }
+
+ try
+ {
+ createPort(getTestName(), Collections.<String, Object>singletonMap(AmqpPort.NUMBER_OF_SELECTORS, "-1"));
+ fail("Exception not thrown for negative number of selectors");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ try
+ {
+ createPort(getTestName(), Collections.<String, Object>singletonMap(AmqpPort.THREAD_POOL_SIZE, "-1"));
+ fail("Exception not thrown for negative thread pool size");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ try
+ {
+ createPort(getTestName(),
+ Collections.<String, Object>singletonMap(AmqpPort.NUMBER_OF_SELECTORS,
+ AmqpPort.DEFAULT_PORT_AMQP_THREAD_POOL_SIZE));
+ fail("Exception not thrown for number of selectors equal to thread pool size");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testOnChangeThreadPoolValidation() throws Exception
+ {
+ _port = createPort(getTestName());
+ try
+ {
+ _port.setAttributes(Collections.<String, Object>singletonMap(AmqpPort.NUMBER_OF_SELECTORS, "-1"));
+ fail("Exception not thrown for negative number of selectors");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ try
+ {
+ _port.setAttributes(Collections.<String, Object>singletonMap(AmqpPort.THREAD_POOL_SIZE, "-1"));
+ fail("Exception not thrown for negative thread pool size");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ try
+ {
+ _port.setAttributes(Collections.<String, Object>singletonMap(AmqpPort.NUMBER_OF_SELECTORS,
+ AmqpPort.DEFAULT_PORT_AMQP_THREAD_POOL_SIZE));
+ fail("Exception not thrown for number of selectors equal to thread pool size");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
}
}
+ private AmqpPortImpl createPort(final String portName)
+ {
+ return createPort(portName, Collections.<String, Object>emptyMap());
+ }
+
+ private AmqpPortImpl createPort(final String portName, final Map<String, Object> attributes)
+ {
+ Map<String, Object> portAttributes = new HashMap<>();
+ portAttributes.put(AmqpPort.PORT, 0);
+ portAttributes.put(AmqpPort.NAME, portName);
+ portAttributes.put(AmqpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
+ portAttributes.putAll(attributes);
+ AmqpPortImpl port = new AmqpPortImpl(portAttributes, _broker);
+ port.create();
+ return port;
+ }
+
private ServerSocket openSocket() throws IOException
{
ServerSocket serverSocket = new ServerSocket();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1710666&r1=1710665&r2=1710666&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Mon Oct 26 17:52:37 2015
@@ -110,6 +110,7 @@ public class TCPandSSLTransportTest exte
when(port.getNetworkBufferSize()).thenReturn(64*1024);
when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
when(port.getThreadPoolSize()).thenReturn(2);
+ when(port.getNumberOfSelectors()).thenReturn(1);
when(port.getSSLContext()).thenReturn(sslContext);
when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org