You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [20/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java Fri Oct 21 01:19:00 2011
@@ -20,6 +20,17 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -27,9 +38,29 @@ import org.apache.commons.cli.OptionBuil
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
-import org.apache.qpid.server.Broker.InitException;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.xml.QpidLog4JConfigurator;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
+import org.apache.qpid.server.information.management.ServerInformationMBean;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.management.LoggingManagementMBean;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
import org.apache.qpid.server.registry.ApplicationRegistry;
-
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.transport.QpidAcceptor;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
/**
* Main entry point for AMQPD.
@@ -37,129 +68,41 @@ import org.apache.qpid.server.registry.A
*/
public class Main
{
+ private static Logger _logger;
- private static final Option OPTION_HELP = new Option("h", "help", false, "print this message");
+ private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
- private static final Option OPTION_VERSION = new Option("v", "version", false, "print the version information and exit");
+ public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+ public static final String QPID_HOME = "QPID_HOME";
+ private static final int IPV4_ADDRESS_LENGTH = 4;
- private static final Option OPTION_CONFIG_FILE =
- OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
- .create("c");
-
- private static final Option OPTION_PORT =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("listen on the specified port. Overrides any value in the config file")
- .withLongOpt("port").create("p");
-
- private static final Option OPTION_SSLPORT =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("SSL port. Overrides any value in the config file")
- .withLongOpt("sslport").create("s");
-
- private static final Option OPTION_EXCLUDE_0_10 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-10").create();
-
- private static final Option OPTION_EXCLUDE_0_9_1 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-9-1").create();
-
- private static final Option OPTION_EXCLUDE_0_9 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-9").create();
-
- private static final Option OPTION_EXCLUDE_0_8 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-8").create();
-
- private static final Option OPTION_JMX_PORT_REGISTRY_SERVER =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("listen on the specified management (registry server) port. Overrides any value in the config file")
- .withLongOpt("jmxregistryport").create("m");
-
- private static final Option OPTION_JMX_PORT_CONNECTOR_SERVER =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("listen on the specified management (connector server) port. Overrides any value in the config file")
- .withLongOpt("jmxconnectorport").create();
-
- private static final Option OPTION_BIND =
- OptionBuilder.withArgName("address").hasArg()
- .withDescription("bind to the specified address. Overrides any value in the config file")
- .withLongOpt("bind").create("b");
-
- private static final Option OPTION_LOG_CONFIG_FILE =
- OptionBuilder.withArgName("file").hasArg()
- .withDescription("use the specified log4j xml configuration file. By "
- + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE
- + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
-
- private static final Option OPTION_LOG_WATCH =
- OptionBuilder.withArgName("period").hasArg()
- .withDescription("monitor the log file configuration file for changes. Units are seconds. "
- + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
-
- private static final Options OPTIONS = new Options();
-
- static
- {
- OPTIONS.addOption(OPTION_HELP);
- OPTIONS.addOption(OPTION_VERSION);
- OPTIONS.addOption(OPTION_CONFIG_FILE);
- OPTIONS.addOption(OPTION_LOG_CONFIG_FILE);
- OPTIONS.addOption(OPTION_LOG_WATCH);
- OPTIONS.addOption(OPTION_PORT);
- OPTIONS.addOption(OPTION_SSLPORT);
- OPTIONS.addOption(OPTION_EXCLUDE_0_10);
- OPTIONS.addOption(OPTION_EXCLUDE_0_9_1);
- OPTIONS.addOption(OPTION_EXCLUDE_0_9);
- OPTIONS.addOption(OPTION_EXCLUDE_0_8);
- OPTIONS.addOption(OPTION_BIND);
-
- OPTIONS.addOption(OPTION_JMX_PORT_REGISTRY_SERVER);
- OPTIONS.addOption(OPTION_JMX_PORT_CONNECTOR_SERVER);
- }
+ private static final char IPV4_LITERAL_SEPARATOR = '.';
- private CommandLine commandLine;
-
- public static void main(String[] args)
+ protected static class InitException extends Exception
{
- //if the -Dlog4j.configuration property has not been set, enable the init override
- //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
- //finds from the classpath when we get the first Loggers
- if(System.getProperty("log4j.configuration") == null)
+ InitException(String msg, Throwable cause)
{
- System.setProperty("log4j.defaultInitOverride", "true");
+ super(msg, cause);
}
-
- new Main(args);
}
- public Main(final String[] args)
+ protected final Options options = new Options();
+ protected CommandLine commandLine;
+
+ protected Main(String[] args)
{
+ setOptions(options);
if (parseCommandline(args))
{
- try
- {
- execute();
- }
- catch(Throwable e)
- {
- System.err.println("Exception during startup: " + e);
- e.printStackTrace();
- shutdown(1);
- }
+ execute();
}
}
- protected boolean parseCommandline(final String[] args)
+ protected boolean parseCommandline(String[] args)
{
try
{
- commandLine = new PosixParser().parse(OPTIONS, args);
+ commandLine = new PosixParser().parse(options, args);
return true;
}
@@ -167,129 +110,509 @@ public class Main
{
System.err.println("Error: " + e.getMessage());
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("Qpid", OPTIONS, true);
+ formatter.printHelp("Qpid", options, true);
return false;
}
}
- protected void execute() throws Exception
+ @SuppressWarnings("static-access")
+ protected void setOptions(Options options)
+ {
+ Option help = new Option("h", "help", false, "print this message");
+ Option version = new Option("v", "version", false, "print the version information and exit");
+ Option configFile =
+ OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
+ .create("c");
+ Option port =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("listen on the specified port. Overrides any value in the config file")
+ .withLongOpt("port").create("p");
+
+ Option exclude0_10 =
+ OptionBuilder.withArgName("exclude-0-10").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-10").create();
+
+ Option exclude0_9_1 =
+ OptionBuilder.withArgName("exclude-0-9-1").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-9-1").create();
+
+
+ Option exclude0_9 =
+ OptionBuilder.withArgName("exclude-0-9").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-9").create();
+
+
+ Option exclude0_8 =
+ OptionBuilder.withArgName("exclude-0-8").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-8").create();
+
+
+ Option mport =
+ OptionBuilder.withArgName("mport").hasArg()
+ .withDescription("listen on the specified management port. Overrides any value in the config file")
+ .withLongOpt("mport").create("m");
+
+
+ Option bind =
+ OptionBuilder.withArgName("bind").hasArg()
+ .withDescription("bind to the specified address. Overrides any value in the config file")
+ .withLongOpt("bind").create("b");
+ Option logconfig =
+ OptionBuilder.withArgName("logconfig").hasArg()
+ .withDescription("use the specified log4j xml configuration file. By "
+ + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
+ + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+ Option logwatchconfig =
+ OptionBuilder.withArgName("logwatch").hasArg()
+ .withDescription("monitor the log file configuration file for changes. Units are seconds. "
+ + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+
+ options.addOption(help);
+ options.addOption(version);
+ options.addOption(configFile);
+ options.addOption(logconfig);
+ options.addOption(logwatchconfig);
+ options.addOption(port);
+ options.addOption(exclude0_10);
+ options.addOption(exclude0_9_1);
+ options.addOption(exclude0_9);
+ options.addOption(exclude0_8);
+ options.addOption(mport);
+ options.addOption(bind);
+ }
+
+ protected void execute()
{
- BrokerOptions options = new BrokerOptions();
- String configFile = commandLine.getOptionValue(OPTION_CONFIG_FILE.getOpt());
- if(configFile != null)
+ // note this understands either --help or -h. If an option only has a long name you can use that but if
+ // an option has a short name and a long name you must use the short name here.
+ if (commandLine.hasOption("h"))
{
- options.setConfigFile(configFile);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("Qpid", options, true);
}
+ else if (commandLine.hasOption("v"))
+ {
+ String ver = QpidProperties.getVersionString();
- String logWatchConfig = commandLine.getOptionValue(OPTION_LOG_WATCH.getOpt());
- if(logWatchConfig != null)
+ StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
+
+ boolean first = true;
+ for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ protocol.append(", ");
+ }
+
+ protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
+
+ }
+
+ System.out.println(ver + " (" + protocol + ")");
+ }
+ else
{
- options.setLogWatchFrequency(Integer.parseInt(logWatchConfig));
+ try
+ {
+ CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
+ startup();
+ CurrentActor.remove();
+ }
+ catch (InitException e)
+ {
+ System.out.println("Initialisation Error : " + e.getMessage());
+ shutdown(1);
+ }
+ catch (Throwable e)
+ {
+ System.out.println("Error initialising message broker: " + e);
+ e.printStackTrace();
+ shutdown(1);
+ }
}
+ }
+
+ protected void shutdown(int status)
+ {
+ ApplicationRegistry.removeAll();
+ System.exit(status);
+ }
- String logConfig = commandLine.getOptionValue(OPTION_LOG_CONFIG_FILE.getOpt());
- if(logConfig != null)
+ protected void startup() throws Exception
+ {
+ final String QpidHome = System.getProperty(QPID_HOME);
+ final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
+ final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
+ if (!configFile.exists())
+ {
+ String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
+
+ if (QpidHome == null)
+ {
+ error = error + "\nNote: " + QPID_HOME + " is not set.";
+ }
+
+ throw new InitException(error, null);
+ }
+ else
{
- options.setLogConfigFile(logConfig);
+ CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
}
- String jmxPortRegistryServer = commandLine.getOptionValue(OPTION_JMX_PORT_REGISTRY_SERVER.getOpt());
- if(jmxPortRegistryServer != null)
+ String logConfig = commandLine.getOptionValue("l");
+ String logWatchConfig = commandLine.getOptionValue("w", "0");
+
+ int logWatchTime = 0;
+ try
+ {
+ logWatchTime = Integer.parseInt(logWatchConfig);
+ }
+ catch (NumberFormatException e)
{
- options.setJmxPortRegistryServer(Integer.parseInt(jmxPortRegistryServer));
+ System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+ + "a non-negative integer. Using default of zero (no watching configured");
}
- String jmxPortConnectorServer = commandLine.getOptionValue(OPTION_JMX_PORT_CONNECTOR_SERVER.getLongOpt());
- if(jmxPortConnectorServer != null)
+ File logConfigFile;
+ if (logConfig != null)
+ {
+ logConfigFile = new File(logConfig);
+ configureLogging(logConfigFile, logWatchTime);
+ }
+ else
{
- options.setJmxPortConnectorServer(Integer.parseInt(jmxPortConnectorServer));
+ File configFileDirectory = configFile.getParentFile();
+ logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
+ configureLogging(logConfigFile, logWatchTime);
}
- String bindAddr = commandLine.getOptionValue(OPTION_BIND.getOpt());
- if (bindAddr != null)
+ ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
+ ServerConfiguration serverConfig = config.getConfiguration();
+ updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
+
+ ApplicationRegistry.initialise(config);
+
+ // We have already loaded the BrokerMessages class by this point so we
+ // need to refresh the locale setting incase we had a different value in
+ // the configuration.
+ BrokerMessages.reload();
+
+ // AR.initialise() sets and removes its own actor so we now need to set the actor
+ // for the remainder of the startup, and the default actor if the stack is empty
+ CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
+ CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
+ GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
+
+
+ try
{
- options.setBind(bindAddr);
+ configureLoggingManagementMBean(logConfigFile, logWatchTime);
+
+ ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
+ configMBean.register();
+
+ ServerInformationMBean sysInfoMBean =
+ new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
+ sysInfoMBean.register();
+
+
+ String[] portStr = commandLine.getOptionValues("p");
+
+ Set<Integer> ports = new HashSet<Integer>();
+ Set<Integer> exclude_0_10 = new HashSet<Integer>();
+ Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
+ Set<Integer> exclude_0_9 = new HashSet<Integer>();
+ Set<Integer> exclude_0_8 = new HashSet<Integer>();
+
+ if(portStr == null || portStr.length == 0)
+ {
+
+ parsePortList(ports, serverConfig.getPorts());
+ parsePortList(exclude_0_10, serverConfig.getPortExclude010());
+ parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
+ parsePortList(exclude_0_9, serverConfig.getPortExclude09());
+ parsePortList(exclude_0_8, serverConfig.getPortExclude08());
+
+ }
+ else
+ {
+ parsePortArray(ports, portStr);
+ parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
+ parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1"));
+ parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
+ parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
+
+ }
+
+
+
+
+ String bindAddr = commandLine.getOptionValue("b");
+ if (bindAddr == null)
+ {
+ bindAddr = serverConfig.getBind();
+ }
+ InetAddress bindAddress = null;
+
+
+
+ if (bindAddr.equals("wildcard"))
+ {
+ bindAddress = new InetSocketAddress(0).getAddress();
+ }
+ else
+ {
+ bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+ }
+
+ String hostName = bindAddress.getCanonicalHostName();
+
+
+ String keystorePath = serverConfig.getKeystorePath();
+ String keystorePassword = serverConfig.getKeystorePassword();
+ String certType = serverConfig.getCertType();
+ SSLContextFactory sslFactory = null;
+
+ if (!serverConfig.getSSLOnly())
+ {
+
+ for(int port : ports)
+ {
+
+ NetworkDriver driver = new MINANetworkDriver();
+
+ Set<VERSION> supported = EnumSet.allOf(VERSION.class);
+
+ if(exclude_0_10.contains(port))
+ {
+ supported.remove(VERSION.v0_10);
+ }
+
+ if(exclude_0_9_1.contains(port))
+ {
+ supported.remove(VERSION.v0_9_1);
+ }
+ if(exclude_0_9.contains(port))
+ {
+ supported.remove(VERSION.v0_9);
+ }
+ if(exclude_0_8.contains(port))
+ {
+ supported.remove(VERSION.v0_8);
+ }
+
+ MultiVersionProtocolEngineFactory protocolEngineFactory =
+ new MultiVersionProtocolEngineFactory(hostName, supported);
+
+
+
+ driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
+ serverConfig.getNetworkConfiguration(), null);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
+
+ }
+
+ }
+
+ if (serverConfig.getEnableSSL())
+ {
+ sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+ new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort()));
+ }
+
+ CurrentActor.get().message(BrokerMessages.READY());
+
+ }
+ finally
+ {
+ // Startup is complete so remove the AR initialised Startup actor
+ CurrentActor.remove();
}
- String[] portStr = commandLine.getOptionValues(OPTION_PORT.getOpt());
+
+
+ }
+
+ private void parsePortArray(Set<Integer> ports, String[] portStr)
+ throws InitException
+ {
if(portStr != null)
{
- parsePortArray(options, portStr, false);
- for(ProtocolExclusion pe : ProtocolExclusion.values())
+ for(int i = 0; i < portStr.length; i++)
{
- parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
+ try
+ {
+ ports.add(Integer.parseInt(portStr[i]));
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InitException("Invalid port: " + portStr[i], e);
+ }
}
}
+ }
- String[] sslPortStr = commandLine.getOptionValues(OPTION_SSLPORT.getOpt());
- if(sslPortStr != null)
+ private void parsePortList(Set<Integer> output, List input)
+ throws InitException
+ {
+ if(input != null)
{
- parsePortArray(options, sslPortStr, true);
- for(ProtocolExclusion pe : ProtocolExclusion.values())
+ for(Object port : input)
{
- parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
+ try
+ {
+ output.add(Integer.parseInt(String.valueOf(port)));
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InitException("Invalid port: " + port, e);
+ }
}
}
-
- startBroker(options);
}
- protected void startBroker(final BrokerOptions options) throws Exception
+ /**
+ * Update the configuration data with the management port.
+ * @param configuration
+ * @param managementPort The string from the command line
+ */
+ private void updateManagementPort(ServerConfiguration configuration, String managementPort)
{
- Broker broker = new Broker();
- broker.startup(options);
+ if (managementPort != null)
+ {
+ try
+ {
+ configuration.setJMXManagementPort(Integer.parseInt(managementPort));
+ }
+ catch (NumberFormatException e)
+ {
+ _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
+ }
+ }
}
- protected void shutdown(final int status)
+ public static void main(String[] args)
{
- ApplicationRegistry.remove();
- System.exit(status);
+ //if the -Dlog4j.configuration property has not been set, enable the init override
+ //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
+ //finds from the classpath when we get the first Loggers
+ if(System.getProperty("log4j.configuration") == null)
+ {
+ System.setProperty("log4j.defaultInitOverride", "true");
+ }
+
+ //now that the override status is know, we can instantiate the Loggers
+ _logger = Logger.getLogger(Main.class);
+
+ new Main(args);
}
- private static void parsePortArray(final BrokerOptions options,final Object[] ports,
- final boolean ssl) throws InitException
+ private byte[] parseIP(String address) throws Exception
{
- if(ports != null)
+ char[] literalBuffer = address.toCharArray();
+ int byteCount = 0;
+ int currByte = 0;
+ byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
+ for (int i = 0; i < literalBuffer.length; i++)
{
- for(int i = 0; i < ports.length; i++)
+ char currChar = literalBuffer[i];
+ if ((currChar >= '0') && (currChar <= '9'))
+ {
+ currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF);
+ }
+
+ if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length))
{
+ ip[byteCount++] = (byte) currByte;
+ currByte = 0;
+ }
+ }
+
+ if (byteCount != 4)
+ {
+ throw new Exception("Invalid IP address: " + address);
+ }
+ return ip;
+ }
+
+ private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
+ {
+ if (logConfigFile.exists() && logConfigFile.canRead())
+ {
+ CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
+
+ if (logWatchTime > 0)
+ {
+ System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+ + logWatchTime + " seconds");
+ // log4j expects the watch interval in milliseconds
try
{
- if(ssl)
- {
- options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
- }
- else
- {
- options.addPort(Integer.parseInt(String.valueOf(ports[i])));
- }
+ QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
}
- catch (NumberFormatException e)
+ catch (Exception e)
{
- throw new InitException("Invalid port: " + ports[i], e);
+ throw new InitException(e.getMessage(),e);
+ }
+ }
+ else
+ {
+ try
+ {
+ QpidLog4JConfigurator.configure(logConfigFile.getPath());
+ }
+ catch (Exception e)
+ {
+ throw new InitException(e.getMessage(),e);
}
}
}
- }
-
- private static void parsePortArray(final BrokerOptions options, final Object[] ports,
- final ProtocolExclusion excludedProtocol) throws InitException
- {
- if(ports != null)
+ else
{
- for(int i = 0; i < ports.length; i++)
+ System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
+ System.err.println("Using the fallback internal log4j.properties configuration");
+
+ InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
+ if(propsFile == null)
+ {
+ throw new IOException("Unable to load the fallback internal log4j.properties configuration file");
+ }
+ else
{
try
{
- options.addExcludedPort(excludedProtocol,
- Integer.parseInt(String.valueOf(ports[i])));
+ Properties fallbackProps = new Properties();
+ fallbackProps.load(propsFile);
+ PropertyConfigurator.configure(fallbackProps);
}
- catch (NumberFormatException e)
+ finally
{
- throw new InitException("Invalid port for exclusion: " + ports[i], e);
+ propsFile.close();
}
}
}
}
+
+ private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
+ {
+ LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
+
+ blm.register();
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Oct 21 01:19:00 2011
@@ -20,8 +20,6 @@
package org.apache.qpid.server.configuration;
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
@@ -39,28 +37,33 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.signal.SignalHandlerTask;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
-public class ServerConfiguration extends ConfigurationPlugin
+public class ServerConfiguration extends ConfigurationPlugin implements SignalHandler
{
protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class);
// Default Configuration values
- public static final int DEFAULT_BUFFER_SIZE = 262144;
+ public static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144;
+ public static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144;
+ public static final boolean DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED = false;
public static final String DEFAULT_STATUS_UPDATES = "on";
public static final String SECURITY_CONFIG_RELOADED = "SECURITY CONFIGURATION RELOADED";
public static final int DEFAULT_FRAME_SIZE = 65536;
public static final int DEFAULT_PORT = 5672;
- public static final int DEFAULT_SSL_PORT = 5671;
+ public static final int DEFAULT_SSL_PORT = 8672;
public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
- public static final int DEFAULT_JMXPORT_REGISTRYSERVER = 8999;
- public static final int JMXPORT_CONNECTORSERVER_OFFSET = 100;
-
+ public static final int DEFAULT_JMXPORT = 8999;
+
public static final String QPID_HOME = "QPID_HOME";
public static final String QPID_WORK = "QPID_WORK";
public static final String LIB_DIR = "lib";
@@ -72,14 +75,19 @@ public class ServerConfiguration extends
private File _configFile;
private File _vhostsFile;
+ private Logger _log = Logger.getLogger(this.getClass());
+
+ private ConfigurationManagementMBean _mbean;
+
// Map of environment variables to config items
private static final Map<String, String> envVarMap = new HashMap<String, String>();
// Configuration values to be read from the configuration file
//todo Move all properties to static values to ensure system testing can be performed.
+ public static final String CONNECTOR_PROTECTIO_ENABLED = "connector.protectio.enabled";
+ public static final String CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE = "connector.protectio.readBufferLimitSize";
+ public static final String CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE = "connector.protectio.writeBufferLimitSize";
public static final String MGMT_CUSTOM_REGISTRY_SOCKET = "management.custom-registry-socket";
- public static final String MGMT_JMXPORT_REGISTRYSERVER = "management.jmxport.registryServer";
- public static final String MGMT_JMXPORT_CONNECTORSERVER = "management.jmxport.connectorServer";
public static final String STATUS_UPDATES = "status-updates";
public static final String ADVANCED_LOCALE = "advanced.locale";
@@ -87,9 +95,9 @@ public class ServerConfiguration extends
envVarMap.put("QPID_PORT", "connector.port");
envVarMap.put("QPID_ENABLEDIRECTBUFFERS", "advanced.enableDirectBuffers");
envVarMap.put("QPID_SSLPORT", "connector.ssl.port");
+ envVarMap.put("QPID_NIO", "connector.qpidnio");
envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool");
- envVarMap.put("QPID_JMXPORT_REGISTRYSERVER", MGMT_JMXPORT_REGISTRYSERVER);
- envVarMap.put("QPID_JMXPORT_CONNECTORSERVER", MGMT_JMXPORT_CONNECTORSERVER);
+ envVarMap.put("QPID_JMXPORT", "management.jmxport");
envVarMap.put("QPID_FRAMESIZE", "advanced.framesize");
envVarMap.put("QPID_MSGAUTH", "security.msg-auth");
envVarMap.put("QPID_AUTOREGISTER", "auto_register");
@@ -123,7 +131,7 @@ public class ServerConfiguration extends
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explicitly call {@link #initialise()} as this is done via the
+ * need to explictly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param configurationURL
@@ -133,26 +141,15 @@ public class ServerConfiguration extends
{
this(parseConfig(configurationURL));
_configFile = configurationURL;
-
- SignalHandlerTask hupReparseTask = new SignalHandlerTask()
+ try
{
- public void handle()
- {
- try
- {
- reparseConfigFileSecuritySections();
- }
- catch (ConfigurationException e)
- {
- _logger.error("Could not reload configuration file security sections", e);
- }
- }
- };
-
- if(!hupReparseTask.register("HUP"))
+ Signal sig = new sun.misc.Signal("HUP");
+ sun.misc.Signal.handle(sig, this);
+ }
+ catch (Exception e)
{
- _logger.info("Unable to register Signal HUP handler to reload security configuration.");
- _logger.info("Signal HUP not supported for this OS / JVM combination - " + SignalHandlerTask.getPlatformDescription());
+ _logger.error("Signal HUP not supported for OS: " + System.getProperty("os.name"));
+ // We're on something that doesn't handle SIGHUP, how sad, Windows.
}
}
@@ -169,7 +166,7 @@ public class ServerConfiguration extends
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explicitly call {@link #initialise()} as this is done via the
+ * need to explictly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param conf
@@ -208,53 +205,7 @@ public class ServerConfiguration extends
@Override
public void validateConfiguration() throws ConfigurationException
{
- // Support for security.jmx.access was removed when JMX access rights were incorporated into the main ACL.
- // This ensure that users remove the element from their configuration file.
-
- if (getListValue("security.jmx.access").size() > 0)
- {
- String message = "Validation error : security/jmx/access is no longer a supported element within the configuration xml."
- + (_configFile == null ? "" : " Configuration file : " + _configFile);
- throw new ConfigurationException(message);
- }
-
- if (getListValue("security.jmx.principal-database").size() > 0)
- {
- String message = "Validation error : security/jmx/principal-database is no longer a supported element within the configuration xml."
- + (_configFile == null ? "" : " Configuration file : " + _configFile);
- throw new ConfigurationException(message);
- }
-
- if (getListValue("security.principal-databases.principal-database(0).class").size() > 0)
- {
- String message = "Validation error : security/principal-databases is no longer supported within the configuration xml."
- + (_configFile == null ? "" : " Configuration file : " + _configFile);
- throw new ConfigurationException(message);
- }
-
- // QPID-3266. Tidy up housekeeping configuration option for scheduling frequency
- if (contains("housekeeping.expiredMessageCheckPeriod"))
- {
- String message = "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod."
- + (_configFile == null ? "" : " Configuration file : " + _configFile);
- throw new ConfigurationException(message);
- }
-
- // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
- // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
- for (String key : new String[] {"management.ssl.keystorePath",
- "management.ssl.keystorePassword," +
- "connector.ssl.keystorePath",
- "connector.ssl.keystorePassword"})
- {
- if (contains(key))
- {
- final String deprecatedXpath = key.replaceAll("\\.", "/");
- final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore");
- _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath
- + (_configFile == null ? "" : " Configuration file : " + _configFile));
- }
- }
+ //Currently doesn't do validation
}
/*
@@ -420,7 +371,7 @@ public class ServerConfiguration extends
public final static Configuration flatConfig(File file) throws ConfigurationException
{
// We have to override the interpolate methods so that
- // interpolation takes place across the entirety of the
+ // interpolation takes place accross the entirety of the
// composite configuration. Without doing this each
// configuration object only interpolates variables defined
// inside itself.
@@ -447,6 +398,18 @@ public class ServerConfiguration extends
return _configFile == null ? "" : _configFile.getAbsolutePath();
}
+ public void handle(Signal arg0)
+ {
+ try
+ {
+ reparseConfigFileSecuritySections();
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.error("Could not reload configuration file security sections", e);
+ }
+ }
+
public void reparseConfigFileSecuritySections() throws ConfigurationException
{
if (_configFile != null)
@@ -490,24 +453,14 @@ public class ServerConfiguration extends
return System.getProperty(QPID_HOME);
}
- public void setJMXPortRegistryServer(int registryServerPort)
- {
- getConfig().setProperty(MGMT_JMXPORT_REGISTRYSERVER, registryServerPort);
- }
-
- public int getJMXPortRegistryServer()
+ public void setJMXManagementPort(int mport)
{
- return getIntValue(MGMT_JMXPORT_REGISTRYSERVER, DEFAULT_JMXPORT_REGISTRYSERVER);
+ getConfig().setProperty("management.jmxport", mport);
}
- public void setJMXPortConnectorServer(int connectorServerPort)
+ public int getJMXManagementPort()
{
- getConfig().setProperty(MGMT_JMXPORT_CONNECTORSERVER, connectorServerPort);
- }
-
- public int getJMXConnectorServerPort()
- {
- return getIntValue(MGMT_JMXPORT_CONNECTORSERVER, getJMXPortRegistryServer() + JMXPORT_CONNECTORSERVER_OFFSET);
+ return getIntValue("management.jmxport", DEFAULT_JMXPORT);
}
public boolean getUseCustomRMISocketFactory()
@@ -550,11 +503,58 @@ public class ServerConfiguration extends
_virtualHosts.put(config.getName(), config);
}
+ public List<String> getPrincipalDatabaseNames()
+ {
+ return getListValue("security.principal-databases.principal-database.name");
+ }
+
+ public List<String> getPrincipalDatabaseClass()
+ {
+ return getListValue("security.principal-databases.principal-database.class");
+ }
+
+ public List<String> getPrincipalDatabaseAttributeNames(int index)
+ {
+ String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.name";
+ return getListValue(name);
+ }
+
+ public List<String> getPrincipalDatabaseAttributeValues(int index)
+ {
+ String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.value";
+ return getListValue(name);
+ }
+
+ public List<String> getManagementPrincipalDBs()
+ {
+ return getListValue("security.jmx.principal-database");
+ }
+
+ public List<String> getManagementAccessList()
+ {
+ return getListValue("security.jmx.access");
+ }
+
public int getFrameSize()
{
return getIntValue("advanced.framesize", DEFAULT_FRAME_SIZE);
}
+ public boolean getProtectIOEnabled()
+ {
+ return getBooleanValue(CONNECTOR_PROTECTIO_ENABLED, DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED);
+ }
+
+ public int getBufferReadLimit()
+ {
+ return getIntValue(CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_READ_LIMIT_SIZE);
+ }
+
+ public int getBufferWriteLimit()
+ {
+ return getIntValue(CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_WRITE_LIMIT_SIZE);
+ }
+
public boolean getSynchedClocks()
{
return getBooleanValue("advanced.synced-clocks");
@@ -565,10 +565,14 @@ public class ServerConfiguration extends
return getBooleanValue("security.msg-auth");
}
+ public String getJMXPrincipalDatabase()
+ {
+ return getStringValue("security.jmx.principal-database");
+ }
+
public String getManagementKeyStorePath()
{
- final String fallback = getStringValue("management.ssl.keystorePath");
- return getStringValue("management.ssl.keyStorePath", fallback);
+ return getStringValue("management.ssl.keyStorePath");
}
public boolean getManagementSSLEnabled()
@@ -578,8 +582,7 @@ public class ServerConfiguration extends
public String getManagementKeyStorePassword()
{
- final String fallback = getStringValue("management.ssl.keystorePassword");
- return getStringValue("management.ssl.keyStorePassword", fallback);
+ return getStringValue("management.ssl.keyStorePassword");
}
public boolean getQueueAutoRegister()
@@ -647,14 +650,14 @@ public class ServerConfiguration extends
return getLongValue("flowResumeCapacity", getCapacity());
}
- public int getConnectorProcessors()
+ public int getProcessors()
{
return getIntValue("connector.processors", 4);
}
public List getPorts()
{
- return getListValue("connector.port", Collections.<Integer>singletonList(DEFAULT_PORT));
+ return getListValue("connector.port", Collections.singletonList(DEFAULT_PORT));
}
public List getPortExclude010()
@@ -679,17 +682,17 @@ public class ServerConfiguration extends
public String getBind()
{
- return getStringValue("connector.bind", WILDCARD_ADDRESS);
+ return getStringValue("connector.bind", "wildcard");
}
public int getReceiveBufferSize()
{
- return getIntValue("connector.socketReceiveBuffer", DEFAULT_BUFFER_SIZE);
+ return getIntValue("connector.socketReceiveBuffer", 32767);
}
public int getWriteBufferSize()
{
- return getIntValue("connector.socketWriteBuffer", DEFAULT_BUFFER_SIZE);
+ return getIntValue("connector.socketWriteBuffer", 32767);
}
public boolean getTcpNoDelay()
@@ -712,28 +715,31 @@ public class ServerConfiguration extends
return getBooleanValue("connector.ssl.sslOnly");
}
- public List getSSLPorts()
+ public int getSSLPort()
{
- return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT));
+ return getIntValue("connector.ssl.port", DEFAULT_SSL_PORT);
}
- public String getConnectorKeyStorePath()
+ public String getKeystorePath()
{
- final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name.
- return getStringValue("connector.ssl.keyStorePath", fallback);
+ return getStringValue("connector.ssl.keystorePath", "none");
}
- public String getConnectorKeyStorePassword()
+ public String getKeystorePassword()
{
- final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name.
- return getStringValue("connector.ssl.keyStorePassword", fallback);
+ return getStringValue("connector.ssl.keystorePassword", "none");
}
- public String getConnectorCertType()
+ public String getCertType()
{
return getStringValue("connector.ssl.certType", "SunX509");
}
+ public boolean getQpidNIO()
+ {
+ return getBooleanValue("connector.qpidnio");
+ }
+
public boolean getUseBiasedWrites()
{
return getBooleanValue("advanced.useWriteBiasedPool");
@@ -749,44 +755,69 @@ public class ServerConfiguration extends
getConfig().setProperty("virtualhosts.default", vhost);
}
- public void setHousekeepingCheckPeriod(long value)
+ public void setHousekeepingExpiredMessageCheckPeriod(long value)
{
- getConfig().setProperty("housekeeping.checkPeriod", value);
+ getConfig().setProperty("housekeeping.expiredMessageCheckPeriod", value);
}
public long getHousekeepingCheckPeriod()
{
- return getLongValue("housekeeping.checkPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
+ return getLongValue("housekeeping.checkPeriod",
+ getLongValue("housekeeping.expiredMessageCheckPeriod",
+ DEFAULT_HOUSEKEEPING_PERIOD));
}
- public long getStatisticsSamplePeriod()
+ public NetworkDriverConfiguration getNetworkConfiguration()
{
- return getConfig().getLong("statistics.sample.period", 5000L);
- }
+ return new NetworkDriverConfiguration()
+ {
- public boolean isStatisticsGenerationBrokerEnabled()
- {
- return getConfig().getBoolean("statistics.generation.broker", false);
- }
+ public Integer getTrafficClass()
+ {
+ return null;
+ }
- public boolean isStatisticsGenerationVirtualhostsEnabled()
- {
- return getConfig().getBoolean("statistics.generation.virtualhosts", false);
- }
+ public Boolean getTcpNoDelay()
+ {
+ // Can't call parent getTcpNoDelay since it just calls this one
+ return getBooleanValue("connector.tcpNoDelay", true);
+ }
- public boolean isStatisticsGenerationConnectionsEnabled()
- {
- return getConfig().getBoolean("statistics.generation.connections", false);
- }
+ public Integer getSoTimeout()
+ {
+ return null;
+ }
- public long getStatisticsReportingPeriod()
- {
- return getConfig().getLong("statistics.reporting.period", 0L);
- }
+ public Integer getSoLinger()
+ {
+ return null;
+ }
- public boolean isStatisticsReportResetEnabled()
- {
- return getConfig().getBoolean("statistics.reporting.reset", false);
+ public Integer getSendBufferSize()
+ {
+ return getBufferWriteLimit();
+ }
+
+ public Boolean getReuseAddress()
+ {
+ return null;
+ }
+
+ public Integer getReceiveBufferSize()
+ {
+ return getBufferReadLimit();
+ }
+
+ public Boolean getOOBInline()
+ {
+ return null;
+ }
+
+ public Boolean getKeepAlive()
+ {
+ return null;
+ }
+ };
}
public int getMaxChannelCount()
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Fri Oct 21 01:19:00 2011
@@ -86,9 +86,9 @@ public class VirtualHostConfiguration ex
return _name;
}
- public long getHousekeepingCheckPeriod()
+ public long getHousekeepingExpiredMessageCheckPeriod()
{
- return getLongValue("housekeeping.checkPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
+ return getLongValue("housekeeping.expiredMessageCheckPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
}
public String getAuthenticationDatabase()
@@ -306,45 +306,11 @@ public class VirtualHostConfiguration ex
@Override
public void validateConfiguration() throws ConfigurationException
{
- // QPID-3249. Support for specifying authentication name at vhost level is no longer supported.
- if (getListValue("security.authentication.name").size() > 0)
- {
- String message = "Validation error : security/authentication/name is no longer a supported element within the configuration xml."
- + " It appears in virtual host definition : " + _name;
- throw new ConfigurationException(message);
- }
-
- // QPID-3266. Tidy up housekeeping configuration option for scheduling frequency
- if (contains("housekeeping.expiredMessageCheckPeriod"))
- {
- String message = "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod."
- + " It appears in virtual host definition : " + _name;
- throw new ConfigurationException(message);
- }
+ //Currently doesn't do validation
}
public int getHouseKeepingThreadCount()
{
return getIntValue("housekeeping.poolSize", Runtime.getRuntime().availableProcessors());
}
-
- public long getTransactionTimeoutOpenWarn()
- {
- return getLongValue("transactionTimeout.openWarn", 0L);
- }
-
- public long getTransactionTimeoutOpenClose()
- {
- return getLongValue("transactionTimeout.openClose", 0L);
- }
-
- public long getTransactionTimeoutIdleWarn()
- {
- return getLongValue("transactionTimeout.idleWarn", 0L);
- }
-
- public long getTransactionTimeoutIdleClose()
- {
- return getLongValue("transactionTimeout.idleClose", 0L);
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@ import org.apache.commons.configuration.
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.ConfigurationManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import java.util.Collections;
import java.util.HashMap;
@@ -139,28 +138,10 @@ public abstract class ConfigurationPlugi
}
}
- offerRemainingConfigurationToOtherPlugins(path, configuration, elements);
-
- validateConfiguration();
- }
-
- private void offerRemainingConfigurationToOtherPlugins(String path,
- Configuration configuration, Set<String> elements) throws ConfigurationException
- {
- final IApplicationRegistry appRegistry = safeGetApplicationRegistryInstance();
-
- if (appRegistry == null)
- {
- // We see this happen during shutdown due to asynchronous reconfig using IO threads.
- // Need to remove the responsibility for offering configuration to other class.
- _logger.info("Cannot offer remaining config to other plugins, can't find app registry");
- return;
- }
-
- final ConfigurationManager configurationManager = appRegistry.getConfigurationManager();
// Process the elements in the configuration
for (String element : elements)
{
+ ConfigurationManager configurationManager = ApplicationRegistry.getInstance().getConfigurationManager();
Configuration handled = element.length() == 0 ? configuration : configuration.subset(element);
String configurationElement = element;
@@ -181,18 +162,8 @@ public abstract class ConfigurationPlugi
_pluginConfiguration.put(plugin.getClass().getName(), plugin);
}
}
- }
- private IApplicationRegistry safeGetApplicationRegistryInstance()
- {
- try
- {
- return ApplicationRegistry.getInstance();
- }
- catch (IllegalStateException ise)
- {
- return null;
- }
+ validateConfiguration();
}
/** Helper method to print out list of keys in a {@link Configuration}. */
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Fri Oct 21 01:19:00 2011
@@ -20,21 +20,19 @@
*/
package org.apache.qpid.server.connection;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQProtocolEngine;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
- private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
+ private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>();
private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
@@ -42,46 +40,44 @@ public class ConnectionRegistry implemen
{
// None required
}
-
- /** Close all of the currently open connections. */
- public void close()
+
+ public void expireClosedChannels()
{
- _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
- while (!_registry.isEmpty())
+ for (AMQProtocolSession connection : _registry)
{
- AMQConnectionModel connection = _registry.get(0);
- closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down");
+ connection.closeIfLingeringClosedChannels();
}
}
- public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
+ /** Close all of the currently open connections. */
+ public void close()
{
- try
- {
- connection.close(cause, message);
- }
- catch (TransportException e)
- {
- _logger.warn("Error closing connection:" + e.getMessage());
- }
- catch (AMQException e)
+ while (!_registry.isEmpty())
{
- _logger.warn("Error closing connection:" + e.getMessage());
+ AMQProtocolSession connection = _registry.get(0);
+
+ try
+ {
+ connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down",
+ 0, 0,
+ connection.getProtocolOutputConverter().getProtocolMajorVersion(),
+ connection.getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null), true);
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Error closing connection:" + e.getMessage());
+ }
}
}
- public void registerConnection(AMQConnectionModel connnection)
+ public void registerConnection(AMQProtocolSession connnection)
{
_registry.add(connnection);
}
- public void deregisterConnection(AMQConnectionModel connnection)
+ public void deregisterConnection(AMQProtocolSession connnection)
{
_registry.remove(connnection);
}
-
- public List<AMQConnectionModel> getConnections()
- {
- return new ArrayList<AMQConnectionModel>(_registry);
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Fri Oct 21 01:19:00 2011
@@ -20,23 +20,18 @@
*/
package org.apache.qpid.server.connection;
-import java.util.List;
-
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
public interface IConnectionRegistry
{
+
public void initialise();
public void close() throws AMQException;
-
- public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
-
- public List<AMQConnectionModel> getConnections();
- public void registerConnection(AMQConnectionModel connnection);
+ public void registerConnection(AMQProtocolSession connnection);
+
+ public void deregisterConnection(AMQProtocolSession connnection);
- public void deregisterConnection(AMQConnectionModel connnection);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Oct 21 01:19:00 2011
@@ -356,7 +356,7 @@ public abstract class AbstractExchange i
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
final ArrayList<? extends BaseQueue> queues = doRoute(message);
- if(!queues.isEmpty())
+ if(queues != null && !queues.isEmpty())
{
_routedMessageCount.incrementAndGet();
_routedMessageSize.addAndGet(message.getSize());
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java Fri Oct 21 01:19:00 2011
@@ -90,12 +90,12 @@ public abstract class AbstractExchangeMB
public String getObjectInstanceName()
{
- return ObjectName.quote(_exchange.getName());
+ return _exchange.getNameShortString().toString();
}
public String getName()
{
- return _exchange.getName();
+ return _exchange.getNameShortString().toString();
}
public String getExchangeType()
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Fri Oct 21 01:19:00 2011
@@ -30,12 +30,15 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.ExchangeConfig;
import javax.management.JMException;
import java.util.ArrayList;
+import java.util.List;
import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
public interface Exchange extends ExchangeReferrer, ExchangeConfig
{
@@ -64,12 +67,7 @@ public interface Exchange extends Exchan
void close() throws AMQException;
- /**
- * Returns a list of queues to which to route this message. If there are
- * no queues the empty list must be returned.
- *
- * @return list of queues to which to route the message.
- */
+
ArrayList<? extends BaseQueue> route(InboundMessage message);
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java Fri Oct 21 01:19:00 2011
@@ -274,6 +274,132 @@ public class HeadersParser
}
+ public static void main(String[] args) throws AMQFrameDecodingException
+ {
+
+ FieldTable bindingTable = new FieldTable();
+
+ bindingTable.setString(new AMQShortString("x-match"),"all");
+ bindingTable.setInteger("a",1);
+ bindingTable.setVoid(new AMQShortString("b"));
+ bindingTable.setString("c","");
+ bindingTable.setInteger("d",4);
+ bindingTable.setInteger("e",1);
+
+
+
+ FieldTable bindingTable2 = new FieldTable();
+ bindingTable2.setString(new AMQShortString("x-match"),"all");
+ bindingTable2.setInteger("a",1);
+ bindingTable2.setVoid(new AMQShortString("b"));
+ bindingTable2.setString("c","");
+ bindingTable2.setInteger("d",4);
+ bindingTable2.setInteger("e",1);
+ bindingTable2.setInteger("f",1);
+
+
+ FieldTable table = new FieldTable();
+ table.setInteger("a",1);
+ table.setInteger("b",2);
+ table.setString("c","");
+ table.setInteger("d",4);
+ table.setInteger("e",1);
+ table.setInteger("f",1);
+ table.setInteger("h",1);
+ table.setInteger("i",1);
+ table.setInteger("j",1);
+ table.setInteger("k",1);
+ table.setInteger("l",1);
+
+ org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.allocate( (int) table.getEncodedSize());
+ EncodingUtils.writeFieldTableBytes(buffer, table);
+ buffer.flip();
+
+ FieldTable table2 = EncodingUtils.readFieldTable(buffer);
+
+
+
+ FieldTable bindingTable3 = new FieldTable();
+ bindingTable3.setString(new AMQShortString("x-match"),"any");
+ bindingTable3.setInteger("a",1);
+ bindingTable3.setInteger("b",3);
+
+
+ FieldTable bindingTable4 = new FieldTable();
+ bindingTable4.setString(new AMQShortString("x-match"),"any");
+ bindingTable4.setVoid(new AMQShortString("a"));
+
+
+ FieldTable bindingTable5 = new FieldTable();
+ bindingTable5.setString(new AMQShortString("x-match"),"all");
+ bindingTable5.setString(new AMQShortString("h"),"hello");
+
+ for(int i = 0; i < 100; i++)
+ {
+ printMatches(new FieldTable[] {bindingTable5} , table2);
+ }
+
+
+
+ }
+
+
+
+ private static void printMatches(final FieldTable[] bindingKeys, final FieldTable routingKey)
+ {
+ HeadersMatcherDFAState sm = null;
+ Map<HeaderMatcherResult, String> resultMap = new HashMap<HeaderMatcherResult, String>();
+
+ HeadersParser parser = new HeadersParser();
+
+ for(int i = 0; i < bindingKeys.length; i++)
+ {
+ HeaderMatcherResult r = new HeaderMatcherResult();
+ resultMap.put(r, bindingKeys[i].toString());
+
+
+ if(i==0)
+ {
+ sm = parser.createStateMachine(bindingKeys[i], r);
+ }
+ else
+ {
+ sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeys[i], r));
+ }
+ }
+
+ Collection<HeaderMatcherResult> results = null;
+ long beforeTime = System.currentTimeMillis();
+ for(int i = 0; i < 1000000; i++)
+ {
+ routingKey.size();
+
+ assert sm != null;
+ results = sm.match(routingKey);
+
+ }
+ long elapsed = System.currentTimeMillis() - beforeTime;
+ System.out.println("1000000 Iterations took: " + elapsed);
+ Collection<String> resultStrings = new ArrayList<String>();
+
+ assert results != null;
+ for(HeaderMatcherResult result : results)
+ {
+ resultStrings.add(resultMap.get(result));
+ }
+
+ final ArrayList<String> nonMatches = new ArrayList<String>();
+ for(FieldTable key : bindingKeys)
+ {
+ nonMatches.add(key.toString());
+ }
+ nonMatches.removeAll(resultStrings);
+ System.out.println("\""+routingKey+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
+
+
+ }
+
+
public final static class KeyValuePair
{
public final HeaderKey _key;
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java Fri Oct 21 01:19:00 2011
@@ -38,7 +38,6 @@ import org.apache.qpid.server.queue.Base
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -697,7 +696,7 @@ public class Bridge implements BridgeCon
//TODO Handle the passing of non-null Filters and Arguments here
- Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
_destination,
MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
@@ -769,7 +768,7 @@ public class Bridge implements BridgeCon
//TODO Handle the passing of non-null Filters and Arguments here
- Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
_destination,
MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java Fri Oct 21 01:19:00 2011
@@ -258,6 +258,7 @@ public class BrokerLink implements LinkC
_remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString();
}
_qpidConnection.setSessionFactory(new SessionFactory());
+ _qpidConnection.setAuthorizationID(_username == null ? "" : _username);
updateState(State.ESTABLISHING, State.OPERATIONAL);
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Fri Oct 21 01:19:00 2011
@@ -37,8 +37,8 @@ import org.apache.qpid.server.queue.Filt
public class PropertyExpression implements Expression
{
// Constants - defined the same as JMS
- private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT }
-
+ private static final int NON_PERSISTENT = 1;
+ private static final int PERSISTENT = 2;
private static final int DEFAULT_PRIORITY = 4;
private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
@@ -172,14 +172,13 @@ public class PropertyExpression implemen
{
public Object evaluate(Filterable message)
{
- JMSDeliveryMode mode = message.isPersistent() ? JMSDeliveryMode.PERSISTENT :
- JMSDeliveryMode.NON_PERSISTENT;
+ int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
if (_logger.isDebugEnabled())
{
_logger.debug("JMSDeliveryMode is :" + mode);
}
- return mode.toString();
+ return mode;
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -144,7 +144,7 @@ public class BasicConsumeMethodHandler i
_logger.debug("Closing connection due to invalid selector");
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
+ AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
new AMQShortString(ise.getMessage()),
body.getClazz(),
body.getMethod());
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -162,7 +162,14 @@ public class BasicGetMethodHandler imple
}
else
{
- sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ sub = new GetNoAckSubscription(channel,
+ session,
+ null,
+ null,
+ false,
+ singleMessageCredit,
+ getDeliveryMethod,
+ getRecordMethod);
}
queue.registerSubscription(sub,false);
@@ -173,5 +180,27 @@ public class BasicGetMethodHandler imple
}
+ public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+ {
+ public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ }
+
+ public boolean isTransient()
+ {
+ return true;
+ }
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !getCreditManager().useCreditForMessage(msg.getMessage());
+ }
+
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -68,7 +68,5 @@ public class ConnectionCloseMethodHandle
ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
session.writeFrame(responseBody.generateFrame(channelId));
- session.closeProtocolSession();
-
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.server.handler;
-
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionCloseBody;
@@ -68,7 +68,7 @@ public class ConnectionSecureOkMethodHan
}
MethodRegistry methodRegistry = session.getMethodRegistry();
AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
- switch (authResult.getStatus())
+ switch (authResult.status)
{
case ERROR:
Exception cause = authResult.getCause();
@@ -88,10 +88,7 @@ public class ConnectionSecureOkMethodHan
disposeSaslServer(session);
break;
case SUCCESS:
- if (_logger.isInfoEnabled())
- {
- _logger.info("Connected as: " + UsernamePrincipal.getUsernamePrincipalFromSubject(authResult.getSubject()));
- }
+ _logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
ConnectionTuneBody tuneBody =
@@ -99,13 +96,13 @@ public class ConnectionSecureOkMethodHan
ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
- session.setAuthorizedSubject(authResult.getSubject());
+ session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
disposeSaslServer(session);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
session.writeFrame(secureBody.generateFrame(0));
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org