You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/02/19 11:03:24 UTC
svn commit: r745799 [1/3] - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/ser...
Author: aidan
Date: Thu Feb 19 10:03:18 2009
New Revision: 745799
URL: http://svn.apache.org/viewvc?rev=745799&view=rev
Log:
QPID-1621: add ServerConfiguration, QueueConfiguration and SecurityConfiguration classes. Move almost all uses of o.a.commons.configuration.Configuration behind there.
@Configured delenda est
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfiguration.java
- copied, changed from r745590, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagementConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SecurityConfiguration.java
- copied, changed from r745590, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
Removed:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagementConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/HeartbeatConfig.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AsyncDeliveryConfig.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
qpid/trunk/qpid/java/systests/build.xml
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Thu Feb 19 10:03:18 2009
@@ -52,9 +52,9 @@
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.ManagedBroker;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.routing.RoutingTable;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Feb 19 10:03:18 2009
@@ -20,6 +20,14 @@
*/
package org.apache.qpid.server;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -29,12 +37,12 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -45,19 +53,15 @@
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.transactionlog.TransactionLog;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class AMQChannel
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -114,9 +118,6 @@
public AMQChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog)
throws AMQException
{
- //Set values from configuration
- Configurator.configure(this);
-
_session = session;
_channelId = channelId;
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Feb 19 10:03:18 2009
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -27,36 +33,23 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.FixedSizeByteBufferAllocator;
+import org.apache.mina.common.IoAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.qpid.AMQException;
+import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.management.JMXManagedObjectRegistry;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
import org.apache.qpid.server.protocol.AMQPProtocolProvider;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.ConnectorConfiguration;
-import org.apache.qpid.url.URLSyntaxException;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.List;
/**
* Main entry point for AMQPD.
@@ -200,13 +193,6 @@
_brokerLogger.error("Initialisation Error : " + e.getMessage());
shutdown(1);
}
- catch (ConfigurationException e)
- {
- System.out.println("Error configuring message broker: " + e);
- _brokerLogger.error("Error configuring message broker: " + e);
- e.printStackTrace();
- shutdown(1);
- }
catch (Throwable e)
{
System.out.println("Error initialising message broker: " + e);
@@ -223,7 +209,7 @@
System.exit(status);
}
- protected void startup() throws InitException, ConfigurationException, Exception
+ protected void startup() throws Exception
{
final String QpidHome = System.getProperty(QPID_HOME);
final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
@@ -259,40 +245,32 @@
}
ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
-
-
- updateManagementPort(config.getConfiguration(), commandLine.getOptionValue("m"));
-
-
+ ServerConfiguration serverConfig = config.getConfiguration();
+ updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
ApplicationRegistry.initialise(config);
-
//fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues
// that are causing the broker build to pick up the wrong properties file and hence say
// Starting Qpid Client
_brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
- ConnectorConfiguration connectorConfig =
- ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class);
-
- ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
+ ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
// the MINA default is currently to use the pooled allocator although this may change in future
// once more testing of the performance of the simple allocator has been done
- if (!connectorConfig.enablePooledAllocator)
+ if (!serverConfig.getEnablePooledAllocator())
{
ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
}
-
- if(connectorConfig.useBiasedWrites)
+ if(serverConfig.getUseBiasedWrites())
{
System.setProperty("org.apache.qpid.use_write_biased_pool","true");
}
- int port = connectorConfig.port;
+ int port = serverConfig.getPort();
String portStr = commandLine.getOptionValue("p");
if (portStr != null)
@@ -306,29 +284,8 @@
throw new InitException("Invalid port: " + portStr, e);
}
}
-
- String VIRTUAL_HOSTS = "virtualhosts";
-
- Object virtualHosts = ApplicationRegistry.getInstance().getConfiguration().getProperty(VIRTUAL_HOSTS);
-
- if (virtualHosts != null)
- {
- if (virtualHosts instanceof Collection)
- {
- int totalVHosts = ((Collection) virtualHosts).size();
- for (int vhost = 0; vhost < totalVHosts; vhost++)
- {
- setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost));
- }
- }
- else
- {
- setupVirtualHosts(configFile.getParent(), (String) virtualHosts);
- }
- }
-
- bind(port, connectorConfig);
-
+
+ bind(port, serverConfig);
}
/**
@@ -336,86 +293,59 @@
* @param configuration
* @param managementPort The string from the command line
*/
- private void updateManagementPort(Configuration configuration, String managementPort)
+ private void updateManagementPort(ServerConfiguration configuration, String managementPort)
{
if (managementPort != null)
{
- int mport;
- int defaultMPort = configuration.getInt(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH);
try
{
- mport = Integer.parseInt(managementPort);
- configuration.setProperty(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH, mport);
+ configuration.setJMXManagementPort(Integer.parseInt(managementPort));
}
catch (NumberFormatException e)
{
- _logger.warn("Invalid management port: " + managementPort + " will use default:" + defaultMPort, e);
+ _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
}
}
}
- protected void setupVirtualHosts(String configFileParent, String configFilePath)
- throws ConfigurationException, AMQException, URLSyntaxException
- {
- String configVar = "${conf}";
-
- if (configFilePath.startsWith(configVar))
- {
- configFilePath = configFileParent + configFilePath.substring(configVar.length());
- }
-
- if (configFilePath.indexOf(".xml") != -1)
- {
- VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath);
- vHostConfig.performBindings();
- }
- else
- {
- // the virtualhosts value is a path. Search it for XML files.
-
- File virtualHostDir = new File(configFilePath);
-
- String[] fileNames = virtualHostDir.list();
-
- for (int each = 0; each < fileNames.length; each++)
- {
- if (fileNames[each].endsWith(".xml"))
- {
- VirtualHostConfiguration vHostConfig =
- new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]);
- vHostConfig.performBindings();
- }
- }
- }
- }
-
- protected void bind(int port, ConnectorConfiguration connectorConfig) throws BindException
+ protected void bind(int port, ServerConfiguration config) throws BindException
{
String bindAddr = commandLine.getOptionValue("b");
if (bindAddr == null)
{
- bindAddr = connectorConfig.bindAddress;
+ bindAddr = config.getBind();
}
try
{
- // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
- IoAcceptor acceptor = connectorConfig.createAcceptor();
+ IoAcceptor acceptor;
+
+ if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
+ {
+ _logger.warn("Using Qpid Multithreaded IO Processing");
+ acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor());
+ }
+ else
+ {
+ _logger.warn("Using Mina IO Processing");
+ acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor());
+ }
+
SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
- sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
- sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
- sc.setTcpNoDelay(connectorConfig.tcpNoDelay);
+ sc.setReceiveBufferSize(config.getReceiveBufferSize());
+ sc.setSendBufferSize(config.getWriteBufferSize());
+ sc.setTcpNoDelay(config.getTcpNoDelay());
// if we do not use the executor pool threading model we get the default leader follower
// implementation provided by MINA
- if (connectorConfig.enableExecutorPool)
+ if (config.getEnableExecutorPool())
{
sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
- if (!connectorConfig.enableSSL || !connectorConfig.sslOnly)
+ if (!config.getEnableSSL() || !config.getSSLOnly())
{
AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
InetSocketAddress bindAddress;
@@ -434,16 +364,16 @@
_brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
}
- if (connectorConfig.enableSSL)
+ if (config.getEnableSSL())
{
AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
try
{
- bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+ bind(acceptor, new InetSocketAddress(config.getSSLPort()), handler, sconfig);
//fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
+ _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
}
catch (IOException e)
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfiguration.java (from r745590, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagementConfiguration.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfiguration.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfiguration.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagementConfiguration.java&r1=745590&r2=745799&rev=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagementConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfiguration.java Thu Feb 19 10:03:18 2009
@@ -18,13 +18,41 @@
* under the License.
*
*/
-package org.apache.qpid.server.management;
+package org.apache.qpid.server.configuration;
-import org.apache.qpid.configuration.Configured;
+import org.apache.commons.configuration.Configuration;
-public class ManagementConfiguration
+
+public class ExchangeConfiguration
{
- @Configured(path = "management.enabled",
- defaultValue = "true")
- public boolean enabled;
+
+ private Configuration _config;
+ private String _name;
+
+ public ExchangeConfiguration(String exchName, Configuration subset)
+ {
+ _name = exchName;
+ _config = subset;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String getType()
+ {
+ return _config.getString("type","direct");
+ }
+
+ public boolean getDurable()
+ {
+ return _config.getBoolean("durable", false);
+ }
+
+ public boolean getAutoDelete()
+ {
+ return _config.getBoolean("autodelete",false);
+ }
+
}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=745799&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Thu Feb 19 10:03:18 2009
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import java.util.List;
+
+import org.apache.commons.configuration.Configuration;
+
+public class QueueConfiguration
+{
+
+ // FIXME AIDAN XXX -- deal with defaults
+
+ private Configuration _config;
+ private String _name;
+
+ public QueueConfiguration(String name, Configuration config)
+ {
+ _config = config;
+ _name = name;
+ }
+
+ public boolean getDurable()
+ {
+ return _config.getBoolean("durable" ,false);
+ }
+
+ public boolean getAutoDelete()
+ {
+ return _config.getBoolean("autodelete", false);
+ }
+
+ public String getOwner()
+ {
+ return _config.getString("owner", null);
+ }
+
+ public boolean getPriority()
+ {
+ return _config.getBoolean("priority", false);
+ }
+
+ public int getPriorities()
+ {
+ return _config.getInt("priorities", -1);
+ }
+
+ public String getExchange()
+ {
+ return _config.getString("exchange", null);
+ }
+
+ public List getRoutingKeys()
+ {
+ return _config.getList("routingKey");
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public long getMaximumMessageAge()
+ {
+ return _config.getLong("maximumMessageAge", 0);
+ }
+
+ public long getMaximumQueueDepth()
+ {
+ return _config.getLong("maximumQueueDepth", 0);
+ }
+
+ public long getMaximumMessageSize()
+ {
+ return _config.getLong("maximumMessageSize", 0);
+ }
+
+ public long getMaximumMessageCount()
+ {
+ return _config.getLong("maximumMessageCount", 0);
+ }
+
+ public long getMinimumAlertRepeatGap()
+ {
+ return _config.getLong("minimumAlertRepeatGap", 0);
+ }
+
+}
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SecurityConfiguration.java (from r745590, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SecurityConfiguration.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SecurityConfiguration.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java&r1=745590&r2=745799&rev=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SecurityConfiguration.java Thu Feb 19 10:03:18 2009
@@ -18,17 +18,24 @@
*
*
*/
-package org.apache.qpid.server.security.auth.database;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
+package org.apache.qpid.server.configuration;
-import java.util.Map;
+import org.apache.commons.configuration.Configuration;
-public interface PrincipalDatabaseManager
+public class SecurityConfiguration
{
- public Map<String, PrincipalDatabase> getDatabases();
- public void initialiseManagement(Configuration config) throws ConfigurationException;
+ private Configuration _conf;
+
+ public SecurityConfiguration(Configuration configuration)
+ {
+ _conf = configuration;
+ }
+
+ public Configuration getConfiguration()
+ {
+ return _conf;
+ }
+
}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=745799&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Thu Feb 19 10:03:18 2009
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.configuration;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+
+public class ServerConfiguration
+{
+
+ private static Configuration _config;
+
+ private static final int DEFAULT_FRAME_SIZE = 65536;
+ private static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144;
+ private static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144;
+ private static final int DEFAULT_PORT = 5672;
+ private static final int DEFAUL_SSL_PORT = 8672;
+ private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
+ private static final int DEFAULT_JMXPORT = 8999;
+
+ private long _housekeepingExpiredMessageCheckPeriod = DEFAULT_HOUSEKEEPING_PERIOD;
+ private static int _jmxPort = DEFAULT_JMXPORT;
+
+ private Map<String, VirtualHostConfiguration> _virtualHosts = new HashMap<String, VirtualHostConfiguration>();
+ private SecurityConfiguration _securityConfiguration = null;
+
+ public ServerConfiguration(File configurationURL) throws ConfigurationException
+ {
+ this(config(configurationURL));
+ }
+
+ public ServerConfiguration(Configuration conf) throws ConfigurationException
+ {
+ _config = conf;
+ _jmxPort = _config.getInt("management.jmxport", 8999);
+
+ _securityConfiguration = new SecurityConfiguration(conf.subset("security"));
+
+ List vhosts = conf.getList("virtualhosts");
+ Iterator i = vhosts.iterator();
+ while (i.hasNext())
+ {
+ Object thing = i.next();
+ if (thing instanceof String)
+ {
+ XMLConfiguration vhostConfiguration = new XMLConfiguration((String) thing);
+ List hosts = vhostConfiguration.getList("virtualhost.name");
+ for (int j = 0; j < hosts.size(); j++)
+ {
+ String name = (String) hosts.get(j);
+ CompositeConfiguration mungedConf = new CompositeConfiguration();
+ mungedConf.addConfiguration(conf.subset("virtualhosts.virtualhost."+name));
+ mungedConf.addConfiguration(vhostConfiguration.subset("virtualhost." + name));
+ VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(name, mungedConf);
+ _virtualHosts.put(vhostConfig.getName(), vhostConfig);
+ }
+ }
+ }
+ }
+
+ public static String[] objListToStringArray(List objList)
+ {
+ String[] networkStrings = new String[objList.size()];
+ int i = 0;
+ for (Object network : objList)
+ {
+ networkStrings[i++] = (String) network;
+ }
+ return networkStrings;
+ }
+
+ // Our configuration class needs to make the interpolate method
+ // public so it can be called below from the config method.
+ private static class MyConfiguration extends CompositeConfiguration
+ {
+ public String interpolate(String obj)
+ {
+ return super.interpolate(obj);
+ }
+ }
+
+ private final static Configuration config(File url) throws ConfigurationException
+ {
+ // We have to override the interpolate methods so that
+ // interpolation takes place accross the entirety of the
+ // composite configuration. Without doing this each
+ // configuration object only interpolates variables defined
+ // inside itself.
+ final MyConfiguration conf = new MyConfiguration();
+ conf.addConfiguration(new SystemConfiguration()
+ {
+ protected String interpolate(String o)
+ {
+ return conf.interpolate(o);
+ }
+ });
+ conf.addConfiguration(new XMLConfiguration(url)
+ {
+ protected String interpolate(String o)
+ {
+ return conf.interpolate(o);
+ }
+ });
+ return conf;
+ }
+
+ public void setJMXManagementPort(int mport)
+ {
+ _jmxPort = mport;
+ }
+
+ public int getJMXManagementPort()
+ {
+ return _jmxPort;
+ }
+
+ public boolean getPlatformMbeanserver()
+ {
+ return _config.getBoolean("management.platform-mbeanserver", true);
+ }
+
+ public String[] getVirtualHosts()
+ {
+ return _virtualHosts.keySet().toArray(new String[_virtualHosts.size()]);
+ }
+
+ public String getPluginDirectory()
+ {
+ return _config.getString("plugin-directory");
+ }
+
+ public VirtualHostConfiguration getVirtualHostConfig(String name)
+ {
+ return _virtualHosts.get(name);
+ }
+
+ public List<String> getPrincipalDatabaseNames()
+ {
+ return _config.getList("security.principal-databases.principal-database.name");
+ }
+
+ public List<String> getPrincipalDatabaseClass()
+ {
+ return _config.getList("security.principal-databases.principal-database.class");
+ }
+
+ public List<String> getPrincipalDatabaseAttributeNames(int index)
+ {
+ String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.name";
+ return _config.getList(name);
+ }
+
+ public List<String> getPrincipalDatabaseAttributeValues(int index)
+ {
+ String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.value";
+ return _config.getList(name);
+ }
+
+ public List<String> getManagementPrincipalDBs()
+ {
+ return _config.getList("security.jmx.principal-database");
+ }
+
+ public List<String> getManagementAccessList()
+ {
+ return _config.getList("security.jmx.access");
+ }
+
+ public int getFrameSize()
+ {
+ return _config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+ }
+
+ public boolean getManagementSecurityEnabled()
+ {
+ return _config.getBoolean("management.security-enabled", false);
+ }
+
+ public boolean getProtectIOEnabled()
+ {
+ return _config.getBoolean("broker.connector.protectio.enabled", false);
+ }
+
+ public int getBufferReadLimit()
+ {
+ return _config.getInt("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE);
+ }
+
+ public int getBufferWriteLimit()
+ {
+ return _config.getInt("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE);
+ }
+
+ public boolean getSynchedClocks()
+ {
+ return _config.getBoolean("advanced.synced-clocks", false);
+ }
+
+ public boolean getMsgAuth()
+ {
+ return _config.getBoolean("security.msg-auth", false);
+ }
+
+ public String getJMXPrincipalDatabase()
+ {
+ return _config.getString("security.jmx.principal-database");
+ }
+
+ public String getManagementKeyStorePath()
+ {
+ return _config.getString("management.ssl.keyStorePath", null);
+ }
+
+ public boolean getManagementSSLEnabled()
+ {
+ return _config.getBoolean("management.ssl.enabled", true);
+ }
+
+ public String getManagementKeyStorePassword()
+ {
+ return _config.getString("management.ssl.keyStorePassword");
+ }
+
+ public SecurityConfiguration getSecurityConfiguration()
+ {
+ return _securityConfiguration;
+ }
+
+ public boolean getQueueAutoRegister()
+ {
+ return _config.getBoolean("queue.auto_register", true);
+ }
+
+ public boolean getManagementEnabled()
+ {
+ return _config.getBoolean("management.enabled", true);
+ }
+
+ public int getHeartBeatDelay()
+ {
+ return _config.getInt("heartbeat.delay", 5);
+ }
+
+ public double getHeartBeatTimeout()
+ {
+ return _config.getDouble("heartbeat.timeoutFactor", 2.0);
+ }
+
+ public int getDeliveryPoolSize()
+ {
+ return _config.getInt("delivery.poolsize", 0);
+ }
+
+ public long getMaximumMessageAge()
+ {
+ return _config.getLong("maximumMessageAge", 0);
+ }
+
+ public long getMaximumMessageCount()
+ {
+ return _config.getLong("maximumMessageCount", 0);
+ }
+
+ public long getMaximumQueueDepth()
+ {
+ return _config.getLong("maximumQueueDepth", 0);
+ }
+
+ public long getMaximumMessageSize()
+ {
+ return _config.getLong("maximumMessageSize", 0);
+ }
+
+ public long getMinimumAlertRepeatGap()
+ {
+ return _config.getLong("minimumAlertRepeatGap", 0);
+ }
+
+ public int getProcessors()
+ {
+ return _config.getInt("connector.processors", 4);
+ }
+
+ public int getPort()
+ {
+ return _config.getInt("connector.port", DEFAULT_PORT);
+ }
+
+ public String getBind()
+ {
+ return _config.getString("connector.bind", "wildcard");
+ }
+
+ public int getReceiveBufferSize()
+ {
+ return _config.getInt("connector.socketReceiveBuffer", 32767);
+ }
+
+ public int getWriteBufferSize()
+ {
+ return _config.getInt("connector.socketWriteBuffer", 32767);
+ }
+
+ public boolean getTcpNoDelay()
+ {
+ return _config.getBoolean("connector.tcpNoDelay", true);
+ }
+
+ public boolean getEnableExecutorPool()
+ {
+ return _config.getBoolean("advanced.filterchain[@enableExecutorPool]", false);
+ }
+
+ public boolean getEnablePooledAllocator()
+ {
+ return _config.getBoolean("advanced.enablePooledAllocator", false);
+ }
+
+ public boolean getEnableDirectBuffers()
+ {
+ return _config.getBoolean("advanced.enableDirectBuffers", false);
+ }
+
+ public boolean getEnableSSL()
+ {
+ return _config.getBoolean("connector.ssl.enabled", false);
+ }
+
+ public boolean getSSLOnly()
+ {
+ return _config.getBoolean("connector.ssl.sslOnly", true);
+ }
+
+ public int getSSLPort()
+ {
+ return _config.getInt("connector.ssl.port", DEFAUL_SSL_PORT);
+ }
+
+ public String getKeystorePath()
+ {
+ return _config.getString("connector.ssl.keystorePath", "none");
+ }
+
+ public String getKeystorePassword()
+ {
+ return _config.getString("connector.ssl.keystorePassword", "none");
+ }
+
+ public String getCertType()
+ {
+ return _config.getString("connector.ssl.certType", "SunX509");
+ }
+
+ public boolean getQpidNIO()
+ {
+ return _config.getBoolean("connector.qpidnio", false);
+ }
+
+ public boolean getUseBiasedWrites()
+ {
+ return _config.getBoolean("advanced.useWriteBiasedPool", false);
+ }
+
+ public String getDefaultVirtualHost()
+ {
+ return _config.getString("virtualhosts.default");
+ }
+
+ public void setHousekeepingExpiredMessageCheckPeriod(long _housekeepingExpiredMessageCheckPeriod)
+ {
+ this._housekeepingExpiredMessageCheckPeriod = _housekeepingExpiredMessageCheckPeriod;
+ }
+
+ public long getHousekeepingExpiredMessageCheckPeriod()
+ {
+ return _housekeepingExpiredMessageCheckPeriod;
+ }
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Feb 19 10:03:18 2009
@@ -20,266 +20,111 @@
*/
package org.apache.qpid.server.configuration;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.store.MemoryMessageStore;
public class VirtualHostConfiguration
{
- private static final Logger _logger = Logger.getLogger(VirtualHostConfiguration.class);
+ private Configuration _config;
+ private String _name;
+ private Map<String, QueueConfiguration> _queues = new HashMap<String, QueueConfiguration>();
+ private Map<String, ExchangeConfiguration> _exchanges = new HashMap<String, ExchangeConfiguration>();
- private static XMLConfiguration _config;
- private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
-
-
- public VirtualHostConfiguration(String configFile) throws ConfigurationException
- {
- _logger.info("Loading Config file:" + configFile);
-
- _config = new XMLConfiguration(configFile);
-
- }
-
-
-
- private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
+ public VirtualHostConfiguration(String name, Configuration config) throws ConfigurationException
{
- _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
+ _config = config;
+ _name = name;
+ Iterator i = _config.getList("queues.queue.name").iterator();
- VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
-
-
-
- if(virtualHost == null)
+ while (i.hasNext())
{
- throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
+ String queueName = (String) i.next();
+ CompositeConfiguration mungedConf = new CompositeConfiguration();
+ mungedConf.addConfiguration(_config.subset("queues.queue." + queueName));
+ mungedConf.addConfiguration(_config.subset("queues"));
+ _queues.put(queueName, new QueueConfiguration(queueName, mungedConf));
}
- List exchangeNames = configuration.getList("exchanges.exchange.name");
-
- for(Object exchangeNameObj : exchangeNames)
+ i = _config.getList("exchanges.exchange.name").iterator();
+ int count = 0;
+ while (i.hasNext())
{
- String exchangeName = String.valueOf(exchangeNameObj);
- configureExchange(virtualHost, exchangeName, configuration);
+ CompositeConfiguration mungedConf = new CompositeConfiguration();
+ mungedConf.addConfiguration(config.subset("exchanges.exchange(" + count++ + ")"));
+ mungedConf.addConfiguration(_config.subset("exchanges"));
+ String exchName = (String) i.next();
+ _exchanges.put(exchName, new ExchangeConfiguration(exchName, mungedConf));
}
-
-
- List queueNames = configuration.getList("queues.queue.name");
-
- for(Object queueNameObj : queueNames)
- {
- String queueName = String.valueOf(queueNameObj);
- configureQueue(virtualHost, queueName, configuration);
- }
-
}
- private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException
+ public String getName()
{
-
- CompositeConfiguration exchangeConfiguration = new CompositeConfiguration();
-
- exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString));
- exchangeConfiguration.addConfiguration(configuration.subset("exchanges"));
-
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
-
- AMQShortString exchangeName = new AMQShortString(exchangeNameString);
-
-
- Exchange exchange;
-
-
-
- synchronized (exchangeRegistry)
- {
- exchange = exchangeRegistry.getExchange(exchangeName);
- if(exchange == null)
- {
-
- AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct"));
- boolean durable = exchangeConfiguration.getBoolean("durable",false);
- boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false);
-
- Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
- exchangeRegistry.registerExchange(newExchange);
- }
-
- }
+ return _name;
}
- public static CompositeConfiguration getDefaultQueueConfiguration(VirtualHost host)
+ public long getHousekeepingExpiredMessageCheckPeriod()
{
- CompositeConfiguration queueConfiguration = null;
- if (_config == null)
- return null;
-
- Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + host.getName());
-
- if (vHostConfiguration == null)
- return null;
-
- Configuration defaultQueueConfiguration = vHostConfiguration.subset("queues");
- if (defaultQueueConfiguration != null)
- {
- queueConfiguration = new CompositeConfiguration();
- queueConfiguration.addConfiguration(defaultQueueConfiguration);
- }
-
- return queueConfiguration;
+ return _config.getLong("housekeeping.expiredMessageCheckPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingExpiredMessageCheckPeriod());
}
- private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException
+ public String getAuthenticationDatabase()
{
- CompositeConfiguration queueConfiguration = new CompositeConfiguration();
-
- queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString));
- queueConfiguration.addConfiguration(configuration.subset("queues"));
-
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- RoutingTable routingTable = virtualHost.getRoutingTable();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
-
-
- AMQShortString queueName = new AMQShortString(queueNameString);
-
- AMQQueue queue;
+ return _config.getString("security.authentication.name");
+ }
- synchronized (queueRegistry)
- {
- queue = queueRegistry.getQueue(queueName);
+ public List getCustomExchanges()
+ {
+ return _config.getList("custom-exchanges.class-name");
+ }
- if (queue == null)
- {
- _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName());
-
- boolean durable = queueConfiguration.getBoolean("durable" ,false);
- boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
- String owner = queueConfiguration.getString("owner", null);
- FieldTable arguments = null;
- boolean priority = queueConfiguration.getBoolean("priority", false);
- int priorities = queueConfiguration.getInt("priorities", -1);
- if(priority || priorities > 0)
- {
- if(arguments == null)
- {
- arguments = new FieldTable();
- }
- if (priorities < 0)
- {
- priorities = 10;
- }
- arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
- }
-
-
- queue = AMQQueueFactory.createAMQQueueImpl(queueName,
- durable,
- owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
- autodelete /* Therefore autodelete makes no sence */,
- virtualHost,
- arguments,
- queueConfiguration);
-
- if (queue.isDurable())
- {
- routingTable.createQueue(queue);
- }
-
- queueRegistry.registerQueue(queue);
- }
- else
- {
- _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating.");
- }
-
- String exchangeName = queueConfiguration.getString("exchange", null);
-
- Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
-
- if(exchange == null)
- {
- exchange = virtualHost.getExchangeRegistry().getDefaultExchange();
- }
-
- if (exchange == null)
- {
- throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
- }
-
- synchronized (exchange)
- {
- List routingKeys = queueConfiguration.getList("routingKey");
- if(routingKeys == null || routingKeys.isEmpty())
- {
- routingKeys = Collections.singletonList(queue.getName());
- }
-
- for(Object routingKeyNameObj : routingKeys)
- {
- AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
-
-
- queue.bind(exchange, routingKey, null);
-
-
- _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
- }
-
- if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
- {
- queue.bind(virtualHost.getExchangeRegistry().getDefaultExchange(), queue.getName(), null);
- }
- }
+ public SecurityConfiguration getSecurityConfiguration()
+ {
+ return new SecurityConfiguration(_config.subset("security"));
+ }
- }
+ public Configuration getStoreConfiguration()
+ {
+ return _config.subset("store");
}
+ public String getRoutingTableClass()
+ {
+ return _config.getString("routingtable.class");
+ }
- public void performBindings() throws AMQException, ConfigurationException
+ public String getTransactionLogClass()
{
- List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
- String defaultVirtualHostName = _config.getString("default");
- if(defaultVirtualHostName != null)
- {
- ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName);
- }
- _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
+ return _config.getString("store.class", MemoryMessageStore.class.getName());
+ }
- for(Object nameObject : virtualHostNames)
- {
- String name = String.valueOf(nameObject);
- configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
- }
+ public List getExchanges()
+ {
+ return _config.getList("exchanges.exchange.name");
+ }
- if (virtualHostNames == null || virtualHostNames.isEmpty())
- {
- throw new ConfigurationException(
- "Virtualhost Configuration document does not contain a valid virtualhost.");
- }
+ public ExchangeConfiguration getExchangeConfiguration(String exchangeName)
+ {
+ return _exchanges.get(exchangeName);
}
+ public String[] getQueueNames()
+ {
+ return _queues.keySet().toArray(new String[_queues.size()]);
+ }
+ public QueueConfiguration getQueueConfiguration(String queueName)
+ {
+ return _queues.get(queueName);
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Feb 19 10:03:18 2009
@@ -25,11 +25,11 @@
import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -73,7 +73,8 @@
return e;
}
- public void initialise(Configuration hostConfig)
+ @Override
+ public void initialise(VirtualHostConfiguration hostConfig)
{
if (hostConfig == null)
@@ -81,7 +82,7 @@
return;
}
- for(Object className : hostConfig.getList("custom-exchanges.class-name"))
+ for(Object className : hostConfig.getCustomExchanges())
{
try
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Thu Feb 19 10:03:18 2009
@@ -26,6 +26,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
public interface ExchangeFactory
@@ -34,7 +35,7 @@
int ticket)
throws AMQException;
- void initialise(Configuration hostConfig);
+ void initialise(VirtualHostConfiguration hostConfig);
Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Thu Feb 19 10:03:18 2009
@@ -25,14 +25,16 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.HeartbeatConfig;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
@@ -92,7 +94,7 @@
ConnectionTuneBody tuneBody =
methodRegistry.createConnectionTuneBody(0xFFFF,
ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
- HeartbeatConfig.getInstance().getDelay());
+ ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
disposeSaslServer(session);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Thu Feb 19 10:03:18 2009
@@ -23,18 +23,19 @@
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.HeartbeatConfig;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
@@ -47,8 +48,6 @@
private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler();
- private static final int DEFAULT_FRAME_SIZE = 65536;
-
public static ConnectionStartOkMethodHandler getInstance()
{
return _instance;
@@ -117,7 +116,7 @@
ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
getConfiguredFrameSize(),
- HeartbeatConfig.getInstance().getDelay());
+ ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
break;
case CONTINUE:
@@ -153,8 +152,8 @@
static int getConfiguredFrameSize()
{
- final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
- final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+ final ServerConfiguration config = ApplicationRegistry.getInstance().getConfiguration();
+ final int framesize = config.getFrameSize();
_logger.info("Framesize set to " + framesize);
return framesize;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Feb 19 10:03:18 2009
@@ -20,27 +20,28 @@
*/
package org.apache.qpid.server.handler;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.configuration.Configured;
-
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.routing.RoutingTable;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -53,17 +54,10 @@
return _instance;
}
- @Configured(path = "queue.auto_register", defaultValue = "true")
- public boolean autoRegister;
+ public boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
private final AtomicInteger _counter = new AtomicInteger();
-
- protected QueueDeclareHandler()
- {
- Configurator.configure(this);
- }
-
public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Thu Feb 19 10:03:18 2009
@@ -32,9 +32,12 @@
import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser;
+import javax.management.InstanceNotFoundException;
import javax.management.JMException;
+import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
+import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
@@ -85,7 +88,7 @@
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
// Retrieve the config parameters
- boolean platformServer = appRegistry.getConfiguration().getBoolean("management.platform-mbeanserver", true);
+ boolean platformServer = appRegistry.getConfiguration().getPlatformMbeanserver();
_mbeanServer =
platformServer ? ManagementFactory.getPlatformMBeanServer()
@@ -105,11 +108,11 @@
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- boolean jmxmpSecurity = appRegistry.getConfiguration().getBoolean("management.security-enabled", false);
- int port = appRegistry.getConfiguration().getInt(MANAGEMENT_PORT_CONFIG_PATH, MANAGEMENT_PORT_DEFAULT);
+ boolean jmxmpSecurity = appRegistry.getConfiguration().getManagementSecurityEnabled();
+ int port = appRegistry.getConfiguration().getJMXManagementPort();
//retrieve the Principal Database assigned to JMX authentication duties
- String jmxDatabaseName = appRegistry.getConfiguration().getString("security.jmx.principal-database");
+ String jmxDatabaseName = appRegistry.getConfiguration().getJMXPrincipalDatabase();
Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases();
PrincipalDatabase db = map.get(jmxDatabaseName);
@@ -154,7 +157,7 @@
RMIServerSocketFactory ssf;
//check ssl enabled option in config, default to true if option is not set
- boolean sslEnabled = appRegistry.getConfiguration().getBoolean("management.ssl.enabled", true);
+ boolean sslEnabled = appRegistry.getConfiguration().getManagementSSLEnabled();
if (sslEnabled)
{
@@ -167,7 +170,7 @@
keyStorePath = System.getProperty("javax.net.ssl.keyStore");
}
else{
- keyStorePath = appRegistry.getConfiguration().getString("management.ssl.keyStorePath", null);
+ keyStorePath = appRegistry.getConfiguration().getManagementKeyStorePath();
}
//check the keystore path value is valid
@@ -202,7 +205,7 @@
if (System.getProperty("javax.net.ssl.keyStorePassword") == null)
{
- if (appRegistry.getConfiguration().getString("management.ssl.keyStorePassword") == null)
+ if (appRegistry.getConfiguration().getManagementKeyStorePassword() == null)
{
throw new ConfigurationException("JMX management SSL keystore password not defined, " +
"unable to start requested SSL protected JMX server");
@@ -210,7 +213,7 @@
else
{
System.setProperty("javax.net.ssl.keyStorePassword",
- appRegistry.getConfiguration().getString("management.ssl.keyStorePassword"));
+ appRegistry.getConfiguration().getManagementKeyStorePassword());
}
}
@@ -379,6 +382,17 @@
// Stopping the RMI registry
UnicastRemoteObject.unexportObject(_rmiRegistry, true);
}
+ for (ObjectName name : _mbeanServer.queryNames(null, null))
+ {
+ try
+ {
+ _mbeanServer.unregisterMBean(name);
+ }
+ catch (JMException e)
+ {
+ // Really shouldn't happen, but we'll ignore that...
+ }
+ }
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Feb 19 10:03:18 2009
@@ -582,7 +582,7 @@
if (delay > 0)
{
_minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay));
+ _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Thu Feb 19 10:03:18 2009
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
@@ -34,15 +37,19 @@
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.transport.ConnectorConfiguration;
import org.apache.qpid.ssl.SSLContextFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
/**
* The protocol handler handles "protocol events" for all connections. The state
* associated with an individual connection is accessed through the protocol session.
@@ -56,9 +63,6 @@
private final IApplicationRegistry _applicationRegistry;
- private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144";
- private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144";
-
private final int BUFFER_READ_LIMIT_SIZE;
private final int BUFFER_WRITE_LIMIT_SIZE;
@@ -72,8 +76,8 @@
_applicationRegistry = applicationRegistry;
// Read the configuration from the application registry
- BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE));
- BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE));
+ BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit();
+ BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit();
_logger.debug("AMQPFastProtocolHandler created");
}
@@ -92,17 +96,22 @@
_logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
-
- ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
- getConfiguredObject(ConnectorConfiguration.class);
- if (connectorConfig.enableExecutorPool)
+ final ServerConfiguration config = _applicationRegistry.getConfiguration();
+
+ String keystorePath = config.getKeystorePath();
+ String keystorePassword = config.getKeystorePassword();
+ String certType = config.getCertType();
+ SSLContextFactory sslContextFactory = null;
+ boolean isSsl = false;
+ if (config.getEnableSSL() && isSSLClient(config, protocolSession))
{
- if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession))
+ sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ isSsl = true;
+ }
+ if (config.getEnableExecutorPool())
+ {
+ if (isSsl)
{
- String keystorePath = connectorConfig.keystorePath;
- String keystorePassword = connectorConfig.keystorePassword;
- String certType = connectorConfig.certType;
- SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
new SSLFilter(sslContextFactory.buildServerContext()));
}
@@ -111,19 +120,14 @@
else
{
protocolSession.getFilterChain().addLast("protocolFilter", pcf);
- if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession))
+ if (isSsl)
{
- String keystorePath = connectorConfig.keystorePath;
- String keystorePassword = connectorConfig.keystorePassword;
- String certType = connectorConfig.certType;
- SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
new SSLFilter(sslContextFactory.buildServerContext()));
}
-
}
- if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false))
+ if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled())
{
try
{
@@ -271,10 +275,10 @@
}
}
- protected boolean isSSLClient(ConnectorConfiguration connectionConfig,
+ protected boolean isSSLClient(ServerConfiguration connectionConfig,
IoSession protocolSession)
{
InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress();
- return addr.getPort() == connectionConfig.sslPort;
+ return addr.getPort() == connectionConfig.getSSLPort();
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Feb 19 10:03:18 2009
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.Subscription;
@@ -141,6 +142,8 @@
long getMinimumAlertRepeatGap();
+ void setMinimumAlertRepeatGap(long value);
+
void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
@@ -162,7 +165,6 @@
void stop();
-
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
@@ -210,6 +212,4 @@
{
public void doTask(AMQQueue queue) throws AMQException;
}
-
- void configure(Configuration virtualHostDefaultQueueConfiguration);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Thu Feb 19 10:03:18 2009
@@ -20,12 +20,11 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.AMQException;
public class AMQQueueFactory
@@ -33,25 +32,10 @@
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
public static AMQQueue createAMQQueueImpl(AMQShortString name,
- boolean durable,
- AMQShortString owner,
- boolean autoDelete,
- VirtualHost virtualHost, final FieldTable arguments)
-
- throws AMQException
- {
-
- return createAMQQueueImpl(name, durable, owner, autoDelete,
- virtualHost, arguments,
- VirtualHostConfiguration.getDefaultQueueConfiguration(virtualHost));
- }
-
- public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
boolean autoDelete,
- VirtualHost virtualHost, final FieldTable arguments,
- Configuration queueConfiguration)
+ VirtualHost virtualHost, final FieldTable arguments)
throws AMQException
{
@@ -66,13 +50,41 @@
{
q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
}
- if (q != null && queueConfiguration != null)
- {
- q.configure(queueConfiguration);
- }
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
return q;
}
+
+ public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
+ {
+ AMQShortString queueName = new AMQShortString(config.getName());
+
+ boolean durable = config.getDurable();
+ boolean autodelete = config.getAutoDelete();
+ AMQShortString owner = (config.getOwner() != null) ? new AMQShortString(config.getOwner()) : null;
+ FieldTable arguments = null;
+ boolean priority = config.getPriority();
+ int priorities = config.getPriorities();
+ if(priority || priorities > 0)
+ {
+ if(arguments == null)
+ {
+ arguments = new FieldTable();
+ }
+ if (priorities < 0)
+ {
+ priorities = 10;
+ }
+ arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
+ }
+
+ AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments);
+ q.setMaximumMessageAge(config.getMaximumMessageAge());
+ q.setMaximumQueueDepth(config.getMaximumQueueDepth());
+ q.setMaximumMessageSize(config.getMaximumMessageSize());
+ q.setMaximumMessageCount(config.getMaximumMessageCount());
+ q.setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ return q;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=745799&r1=745798&r2=745799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Thu Feb 19 10:03:18 2009
@@ -43,7 +43,7 @@
private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
private static final boolean SYNCHED_CLOCKS =
- ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false);
+ ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks();
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
@@ -51,7 +51,7 @@
private final TransactionalContext _txnContext;
private static final boolean MSG_AUTH =
- ApplicationRegistry.getInstance().getConfiguration().getBoolean("security.msg-auth", false);
+ ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
/**
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org