You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/08/11 13:38:10 UTC
svn commit: r1695267 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/model/port/
broker-core/src/test/java/org/apache/qpid/server/model/port/
broker-plugins/management-http...
Author: kwall
Date: Tue Aug 11 11:38:09 2015
New Revision: 1695267
URL: http://svn.apache.org/r1695267
Log:
QPID-6683: [Java Broker] Give HTTP management ports separate individually configurable thread pools
* Acceptor threads no longer defaults to a value derived from the number of cores.
Work by Lorenz Quack <qu...@gmail.com> and Keith Wall <kw...@apache.org>
Added:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java Tue Aug 11 11:38:09 2015
@@ -286,6 +286,44 @@ abstract class AttributeValueConverter<T
}
}
};
+
+ static final AttributeValueConverter<Double> DOUBLE_CONVERTER = new AttributeValueConverter<Double>()
+ {
+
+ @Override
+ public Double convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof Double)
+ {
+ return (Double) value;
+ }
+ else if(value instanceof Number)
+ {
+ return ((Number) value).doubleValue();
+ }
+ else if(value instanceof String)
+ {
+ String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+ try
+ {
+ return Double.valueOf(interpolated);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new IllegalArgumentException("Cannot convert string '" + interpolated + "' to a Double",e);
+ }
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Double");
+ }
+ }
+ };
+
static final AttributeValueConverter<Boolean> BOOLEAN_CONVERTER = new AttributeValueConverter<Boolean>()
{
@@ -465,6 +503,10 @@ abstract class AttributeValueConverter<T
{
return (AttributeValueConverter<X>) LONG_CONVERTER;
}
+ else if(type == Double.class)
+ {
+ return (AttributeValueConverter<X>) DOUBLE_CONVERTER;
+ }
else if(type == Boolean.class)
{
return (AttributeValueConverter<X>) BOOLEAN_CONVERTER;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java Tue Aug 11 11:38:09 2015
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -32,16 +33,19 @@ import org.apache.qpid.server.model.Trus
@ManagedObject( category = false, type = "HTTP")
public interface HttpPort<X extends HttpPort<X>> extends ClientAuthCapablePort<X>
{
- String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false";
- String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false";
+ String THREAD_POOL_MINIMUM = "threadPoolMinimum";
+ String THREAD_POOL_MAXIMUM = "threadPoolMaximum";
+
+ String DEFAULT_HTTP_NEED_CLIENT_AUTH = "false";
+ String DEFAULT_HTTP_WANT_CLIENT_AUTH = "false";
@ManagedAttribute(defaultValue = "*")
String getBindingAddress();
- @ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH )
+ @ManagedAttribute( defaultValue = DEFAULT_HTTP_NEED_CLIENT_AUTH)
boolean getNeedClientAuth();
- @ManagedAttribute( defaultValue = DEFAULT_AMQP_WANT_CLIENT_AUTH )
+ @ManagedAttribute( defaultValue = DEFAULT_HTTP_WANT_CLIENT_AUTH)
boolean getWantClientAuth();
@ManagedAttribute
@@ -59,4 +63,31 @@ public interface HttpPort<X extends Http
Set<Protocol> getProtocols();
void setPortManager(PortManager manager);
+
+ String PORT_HTTP_THREAD_POOL_MAXIMUM = "port.http.threadPool.maximum";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = PORT_HTTP_THREAD_POOL_MAXIMUM )
+ long DEFAULT_PORT_HTTP_THREAD_POOL_MAXIMUM = 24;
+
+ @ManagedAttribute( defaultValue = "${" + PORT_HTTP_THREAD_POOL_MAXIMUM + "}")
+ int getThreadPoolMaximum();
+
+ String PORT_HTTP_THREAD_POOL_MINIMUM = "port.http.threadPool.minimum";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = PORT_HTTP_THREAD_POOL_MINIMUM )
+ long DEFAULT_PORT_HTTP_THREAD_POOL_MINIMUM = 8;
+
+ @ManagedAttribute( defaultValue = "${" + PORT_HTTP_THREAD_POOL_MINIMUM + "}")
+ int getThreadPoolMinimum();
+
+ String PORT_HTTP_ADDITIONAL_INTERNAL_THREADS = "port.http.additionalInternalThreads";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = PORT_HTTP_ADDITIONAL_INTERNAL_THREADS )
+ long DEFAULT_PORT_HTTP_ADDITIONAL_INTERNAL_THREADS = 5;
+
+ String PORT_HTTP_MAXIMUM_QUEUED_REQUESTS = "port.http.maximumQueuedRequests";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = PORT_HTTP_MAXIMUM_QUEUED_REQUESTS )
+ long DEFAULT_PORT_HTTP_MAXIMUM_QUEUED_REQUESTS = 1000;
+
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java Tue Aug 11 11:38:09 2015
@@ -21,9 +21,11 @@
package org.apache.qpid.server.model.port;
import java.util.Map;
+import java.util.Set;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
@@ -36,6 +38,12 @@ public class HttpPortImpl extends Abstra
@ManagedAttributeField
private String _bindingAddress;
+ @ManagedAttributeField
+ private int _threadPoolMaximum;
+
+ @ManagedAttributeField
+ private int _threadPoolMinimum;
+
@ManagedObjectFactoryConstructor
public HttpPortImpl(final Map<String, Object> attributes,
final Broker<?> broker)
@@ -56,6 +64,18 @@ public class HttpPortImpl extends Abstra
}
@Override
+ public int getThreadPoolMaximum()
+ {
+ return _threadPoolMaximum;
+ }
+
+ @Override
+ public int getThreadPoolMinimum()
+ {
+ return _threadPoolMinimum;
+ }
+
+ @Override
protected State onActivate()
{
if(_portManager != null && _portManager.isActivationAllowed(this))
@@ -78,5 +98,58 @@ public class HttpPortImpl extends Abstra
throw new IllegalConfigurationException(String.format("Cannot bind to port %d and binding address '%s'. Port is already is use.",
getPort(), bindingAddress == null || "".equals(bindingAddress) ? "*" : bindingAddress));
}
+
+ if (_threadPoolMaximum < 1)
+ {
+ throw new IllegalConfigurationException(String.format("Thread pool maximum %d is too small. Must be greater than zero.", _threadPoolMaximum));
+ }
+ if (_threadPoolMinimum < 1)
+ {
+ throw new IllegalConfigurationException(String.format("Thread pool minimum %d is too small. Must be greater than zero.", _threadPoolMinimum));
+ }
+ if (_threadPoolMinimum > _threadPoolMaximum)
+ {
+ throw new IllegalConfigurationException(String.format("Thread pool minimum %d cannot be greater than thread pool maximum %d.", _threadPoolMinimum, _threadPoolMaximum));
+ }
+
+ final double additionalInternalThreads = getContextValue(Integer.class, HttpPort.PORT_HTTP_ADDITIONAL_INTERNAL_THREADS);
+ if (additionalInternalThreads < 1)
+ {
+ throw new IllegalConfigurationException(String.format("Number of additional internal threads %d is too small. Must be greater than zero.", additionalInternalThreads));
+ }
+
+ final double maximumQueuedRequests = getContextValue(Integer.class, HttpPort.PORT_HTTP_MAXIMUM_QUEUED_REQUESTS);
+ if (maximumQueuedRequests < 1)
+ {
+ throw new IllegalConfigurationException(String.format("Number of additional internal threads %d is too small. Must be greater than zero.", maximumQueuedRequests));
+ }
+ }
+
+ @Override
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ {
+ super.validateChange(proxyForValidation, changedAttributes);
+ HttpPort changed = (HttpPort) proxyForValidation;
+ if (changedAttributes.contains(HttpPort.THREAD_POOL_MAXIMUM))
+ {
+ if (changed.getThreadPoolMaximum() < 1)
+ {
+ throw new IllegalConfigurationException(String.format("Thread pool maximum %d is too small. Must be greater than zero.", getThreadPoolMaximum()));
+ }
+ }
+ if (changedAttributes.contains(HttpPort.THREAD_POOL_MINIMUM))
+ {
+ if (changed.getThreadPoolMaximum() < 1)
+ {
+ throw new IllegalConfigurationException(String.format("Thread pool minimum %d is too small. Must be greater than zero.", getThreadPoolMinimum()));
+ }
+ }
+ if (changedAttributes.contains(HttpPort.THREAD_POOL_MAXIMUM) || changedAttributes.contains(HttpPort.THREAD_POOL_MINIMUM))
+ {
+ if (changed.getThreadPoolMinimum() > changed.getThreadPoolMaximum())
+ {
+ throw new IllegalConfigurationException(String.format("Thread pool minimum %d cannot be greater than thread pool maximum %d.", changed.getThreadPoolMinimum(), changed.getThreadPoolMaximum()));
+ }
+ }
}
}
Added: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java?rev=1695267&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java (added)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java Tue Aug 11 11:38:09 2015
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.model.port;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class HttpPortImplTest extends QpidTestCase
+{
+ private static final String AUTHENTICATION_PROVIDER_NAME = "test";
+
+ private TaskExecutor _taskExecutor;
+ private Broker _broker;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
+ Model model = BrokerModel.getInstance();
+
+ _broker = mock(Broker.class);
+ when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
+ when(_broker.getChildExecutor()).thenReturn(_taskExecutor);
+ when(_broker.getModel()).thenReturn(model);
+ when(_broker.getEventLogger()).thenReturn(new EventLogger());
+ when(_broker.getCategoryClass()).thenReturn(Broker.class);
+ when(_broker.getSecurityManager()).thenReturn(new SecurityManager(_broker, false));
+
+ AuthenticationProvider<?> provider = mock(AuthenticationProvider.class);
+ when(provider.getName()).thenReturn(AUTHENTICATION_PROVIDER_NAME);
+ when(provider.getParent(Broker.class)).thenReturn(_broker);
+ when(provider.getMechanisms()).thenReturn(Arrays.asList("PLAIN"));
+ when(_broker.getChildren(AuthenticationProvider.class)).thenReturn(Collections.<AuthenticationProvider>singleton(
+ provider));
+ when(_broker.getChildByName(AuthenticationProvider.class, AUTHENTICATION_PROVIDER_NAME)).thenReturn(provider);
+
+ }
+
+ public void testCreateWithIllegalThreadPoolValues() throws Exception
+ {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(HttpPort.PORT, 10000);
+ attributes.put(HttpPort.NAME, getTestName());
+ attributes.put(HttpPort.THREAD_POOL_MINIMUM, 51);
+ attributes.put(HttpPort.THREAD_POOL_MAXIMUM, 50);
+ attributes.put(HttpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
+
+
+ HttpPortImpl port = new HttpPortImpl(attributes, _broker);
+ try
+ {
+ port.create();
+ fail("Creation should fail due to validation check");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testChangeWithIllegalThreadPoolValues() throws Exception
+ {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(HttpPort.PORT, 10000);
+ attributes.put(HttpPort.NAME, getTestName());
+ attributes.put(HttpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
+
+
+ HttpPortImpl port = new HttpPortImpl(attributes, _broker);
+ port.create();
+
+ final Map<String, Object> updates = new HashMap<>();
+ updates.put(HttpPort.THREAD_POOL_MINIMUM, 51);
+ updates.put(HttpPort.THREAD_POOL_MAXIMUM, 50);
+ try
+ {
+ port.setAttributes(updates);
+ fail("Change should fail due to validation check");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // PASS
+ }
+ }
+
+}
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java Tue Aug 11 11:38:09 2015
@@ -28,7 +28,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -57,6 +56,7 @@ import org.eclipse.jetty.servlet.Servlet
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,13 +144,12 @@ public class HttpManagement extends Abst
{
getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
- Collection<Port<?>> httpPorts = getHttpPorts(getBroker().getPorts());
- Map<Port<?>, Connector> connectors = new HashMap<>();
- _server = createServer(httpPorts, connectors);
+ Collection<HttpPort<?>> httpPorts = getHttpPorts(getBroker().getPorts());
+ _server = createServer(httpPorts);
try
{
_server.start();
- logOperationalListenMessages(httpPorts, connectors);
+ logOperationalListenMessages(httpPorts);
}
catch (Exception e)
{
@@ -186,7 +185,7 @@ public class HttpManagement extends Abst
return _sessionTimeout;
}
- private Server createServer(Collection<Port<?>> ports, final Map<Port<?>, Connector> connectors)
+ private Server createServer(Collection<HttpPort<?>> ports)
{
if (_logger.isInfoEnabled())
{
@@ -195,75 +194,20 @@ public class HttpManagement extends Abst
_allowPortActivation = true;
Server server = new Server();
-
- QueuedThreadPool threadPool = new QueuedThreadPool();
- threadPool.setName("HttpManagement");
- threadPool.setMaxQueued(getContextValue(Integer.class, JETTY_THREAD_POOL_MAX_QUEUED));
- threadPool.setMaxThreads(getContextValue(Integer.class, JETTY_THREAD_POOL_MAX_THREADS));
- threadPool.setMinThreads(getContextValue(Integer.class, JETTY_THREAD_POOL_MIN_THREADS));
-
- server.setThreadPool(threadPool);
+ // All connectors will have their own thread pool, so we expect the server to need none.
+ server.setThreadPool(new ZeroSizedThreadPool());
int lastPort = -1;
- for (Port<?> port : ports)
+ for (HttpPort<?> port : ports)
{
- if(port instanceof HttpPort)
+ if (!State.ACTIVE.equals(port.getDesiredState()))
{
-
- if (!State.ACTIVE.equals(port.getDesiredState()))
- {
- continue;
- }
- ((HttpPort<?>)port).setPortManager(this);
-
- if(port.getState() != State.ACTIVE)
- {
-
- // TODO - RG - probably does nothing
- port.startAsync();
- }
- Connector connector = null;
-
- Collection<Transport> transports = port.getTransports();
- if (!transports.contains(Transport.SSL))
- {
- final Port thePort = port;
- connector = new SelectChannelConnector()
- {
- @Override
- public void customize(final EndPoint endpoint, final Request request) throws IOException
- {
- super.customize(endpoint, request);
- request.setAttribute(PORT_SERVLET_ATTRIBUTE, thePort);
- }
- };
- }
- else if (transports.contains(Transport.SSL))
- {
- connector = createSslConnector((HttpPort<?>) port);
- }
- else
- {
- throw new IllegalArgumentException("Unexpected transport on port "
- + port.getName()
- + ":"
- + transports);
- }
- lastPort = port.getPort();
- String bindingAddress = ((HttpPort)port).getBindingAddress();
- if (bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*"))
- {
- connector.setHost(bindingAddress.trim());
- }
- connector.setPort(port.getPort());
- server.addConnector(connector);
- connectors.put(port, connector);
- }
- else
- {
- throw new IllegalArgumentException("Http management can only be added to an Http port");
+ continue;
}
+ SelectChannelConnector connector = createConnector(port);
+ server.addConnector(connector);
+ lastPort = port.getPort();
}
_allowPortActivation = false;
@@ -344,9 +288,72 @@ public class HttpManagement extends Abst
return server;
}
- private Connector createSslConnector(final HttpPort<?> port)
+ private SelectChannelConnector createConnector(final HttpPort<?> port)
{
- final Connector connector;
+ port.setPortManager(this);
+
+ if(port.getState() != State.ACTIVE)
+ {
+ // TODO - RG - probably does nothing
+ port.startAsync();
+ }
+ SelectChannelConnector connector = null;
+
+ Collection<Transport> transports = port.getTransports();
+ if (!transports.contains(Transport.SSL))
+ {
+ final Port thePort = port;
+ connector = new SelectChannelConnector()
+ {
+ @Override
+ public void customize(final EndPoint endpoint, final Request request) throws IOException
+ {
+ super.customize(endpoint, request);
+ request.setAttribute(PORT_SERVLET_ATTRIBUTE, thePort);
+ }
+ };
+ }
+ else if (transports.contains(Transport.SSL))
+ {
+ connector = createSslConnector(port);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unexpected transport on port "
+ + port.getName()
+ + ":"
+ + transports);
+ }
+ String bindingAddress = port.getBindingAddress();
+ if (bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*"))
+ {
+ connector.setHost(bindingAddress.trim());
+ }
+ connector.setPort(port.getPort());
+
+
+ QueuedThreadPool threadPool = new QueuedThreadPool();
+ threadPool.setName("HttpManagement-" + port.getName());
+
+ int additionalInternalThreads = port.getContextValue(Integer.class,
+ HttpPort.PORT_HTTP_ADDITIONAL_INTERNAL_THREADS);
+ int maximumQueueRequests = port.getContextValue(Integer.class, HttpPort.PORT_HTTP_MAXIMUM_QUEUED_REQUESTS);
+
+ int threadPoolMaximum = port.getThreadPoolMaximum();
+ int threadPoolMinimum = port.getThreadPoolMinimum();
+
+ threadPool.setMaxQueued(maximumQueueRequests);
+ threadPool.setMaxThreads(threadPoolMaximum + additionalInternalThreads);
+ threadPool.setMinThreads(threadPoolMinimum + additionalInternalThreads);
+
+ connector.setAcceptors(Math.max(1, threadPoolMaximum / 2));
+ connector.setThreadPool(threadPool);
+ return connector;
+ }
+
+ private SelectChannelConnector createSslConnector(final HttpPort<?> port)
+ {
+ final SelectChannelConnector connector;
KeyStore keyStore = port.getKeyStore();
Collection<TrustStore> trustStores = port.getTrustStores();
if (keyStore == null)
@@ -530,14 +537,14 @@ public class HttpManagement extends Abst
return "v"+String.valueOf(BrokerModel.MODEL_MAJOR_VERSION);
}
- private void logOperationalListenMessages(Collection<Port<?>> ports, final Map<Port<?>, Connector> connectors)
+ private void logOperationalListenMessages(Collection<HttpPort<?>> ports)
{
for (Port port : ports)
{
Set<Transport> transports = port.getTransports();
for (Transport transport: transports)
{
- getBroker().getEventLogger().message(ManagementConsoleMessages.LISTENING(Protocol.HTTP.name(), transport.name(), connectors.get(port).getLocalPort()));
+ getBroker().getEventLogger().message(ManagementConsoleMessages.LISTENING(Protocol.HTTP.name(), transport.name(), port.getPort()));
}
}
}
@@ -552,14 +559,14 @@ public class HttpManagement extends Abst
}
- private Collection<Port<?>> getHttpPorts(Collection<Port<?>> ports)
+ private Collection<HttpPort<?>> getHttpPorts(Collection<Port<?>> ports)
{
- Collection<Port<?>> httpPorts = new HashSet<>();
+ Collection<HttpPort<?>> httpPorts = new HashSet<>();
for (Port<?> port : ports)
{
if (port.getState() != State.ERRORED && port.getProtocols().contains(Protocol.HTTP))
{
- httpPorts.add(port);
+ httpPorts.add((HttpPort<?>) port);
}
}
return httpPorts;
@@ -631,4 +638,35 @@ public class HttpManagement extends Abst
}
}
+ private static class ZeroSizedThreadPool implements ThreadPool
+ {
+ @Override
+ public boolean dispatch(final Runnable job)
+ {
+ throw new IllegalStateException("Job unexpectedly dispatched to server thread pool. Cannot dispatch");
+ }
+
+ @Override
+ public void join() throws InterruptedException
+ {
+ }
+
+ @Override
+ public int getThreads()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getIdleThreads()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean isLowOnThreads()
+ {
+ return false;
+ }
+ }
}
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java Tue Aug 11 11:38:09 2015
@@ -61,20 +61,5 @@ public interface HttpManagementConfigura
static final long DEFAULT_MAX_UPLOAD_SIZE = 100 * 1024;
- String JETTY_THREAD_POOL_MAX_QUEUED = "jetty.threadPool.maxQueued";
- @ManagedContextDefault( name = JETTY_THREAD_POOL_MAX_QUEUED)
- static final long DEFAULT_JETTY_THREAD_POOL_MAX_QUEUED = 1000;
-
- String JETTY_THREAD_POOL_MAX_THREADS = "jetty.threadPool.maxThreads";
- @ManagedContextDefault( name = JETTY_THREAD_POOL_MAX_THREADS)
- static final long DEFAULT_JETTY_THREAD_POOL_MAX_THREADS = 50;
-
-
- String JETTY_THREAD_POOL_MIN_THREADS = "jetty.threadPool.minThreads";
- @ManagedContextDefault( name = JETTY_THREAD_POOL_MIN_THREADS)
- static final long DEFAULT_JETTY_THREAD_POOL_MIN_THREADS = 5;
-
-
-
AuthenticationProvider getAuthenticationProvider(HttpServletRequest request);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org