You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [20/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java Fri Oct 21 01:19:00 2011
@@ -20,6 +20,17 @@
  */
 package org.apache.qpid.server;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
@@ -27,9 +38,29 @@ import org.apache.commons.cli.OptionBuil
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
-import org.apache.qpid.server.Broker.InitException;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.xml.QpidLog4JConfigurator;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
+import org.apache.qpid.server.information.management.ServerInformationMBean;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.management.LoggingManagementMBean;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.transport.QpidAcceptor;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 
 /**
  * Main entry point for AMQPD.
@@ -37,129 +68,41 @@ import org.apache.qpid.server.registry.A
  */
 public class Main
 {
+    private static Logger _logger;
 
-    private static final Option OPTION_HELP = new Option("h", "help", false, "print this message");
+    private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
 
-    private static final Option OPTION_VERSION = new Option("v", "version", false, "print the version information and exit");
+    public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+    public static final String QPID_HOME = "QPID_HOME";
+    private static final int IPV4_ADDRESS_LENGTH = 4;
 
-    private static final Option OPTION_CONFIG_FILE =
-            OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
-                    .create("c");
-
-    private static final Option OPTION_PORT =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("listen on the specified port. Overrides any value in the config file")
-                    .withLongOpt("port").create("p");
-
-    private static final Option OPTION_SSLPORT =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("SSL port. Overrides any value in the config file")
-                    .withLongOpt("sslport").create("s");
-
-    private static final Option OPTION_EXCLUDE_0_10 =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
-                    .withLongOpt("exclude-0-10").create();
-
-    private static final Option OPTION_EXCLUDE_0_9_1 =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
-                    .withLongOpt("exclude-0-9-1").create();
-
-    private static final Option OPTION_EXCLUDE_0_9 =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
-                    .withLongOpt("exclude-0-9").create();
-
-    private static final Option OPTION_EXCLUDE_0_8 =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
-                    .withLongOpt("exclude-0-8").create();
-
-    private static final Option OPTION_JMX_PORT_REGISTRY_SERVER =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("listen on the specified management (registry server) port. Overrides any value in the config file")
-                    .withLongOpt("jmxregistryport").create("m");
-
-    private static final Option OPTION_JMX_PORT_CONNECTOR_SERVER =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("listen on the specified management (connector server) port. Overrides any value in the config file")
-                    .withLongOpt("jmxconnectorport").create();
-
-    private static final Option OPTION_BIND =
-            OptionBuilder.withArgName("address").hasArg()
-                    .withDescription("bind to the specified address. Overrides any value in the config file")
-                    .withLongOpt("bind").create("b");
-
-    private static final Option OPTION_LOG_CONFIG_FILE =
-            OptionBuilder.withArgName("file").hasArg()
-                    .withDescription("use the specified log4j xml configuration file. By "
-                                     + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE
-                                     + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
-
-    private static final Option OPTION_LOG_WATCH =
-            OptionBuilder.withArgName("period").hasArg()
-                    .withDescription("monitor the log file configuration file for changes. Units are seconds. "
-                                     + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
-
-    private static final Options OPTIONS = new Options();
-
-    static
-    {
-        OPTIONS.addOption(OPTION_HELP);
-        OPTIONS.addOption(OPTION_VERSION);
-        OPTIONS.addOption(OPTION_CONFIG_FILE);
-        OPTIONS.addOption(OPTION_LOG_CONFIG_FILE);
-        OPTIONS.addOption(OPTION_LOG_WATCH);
-        OPTIONS.addOption(OPTION_PORT);
-        OPTIONS.addOption(OPTION_SSLPORT);
-        OPTIONS.addOption(OPTION_EXCLUDE_0_10);
-        OPTIONS.addOption(OPTION_EXCLUDE_0_9_1);
-        OPTIONS.addOption(OPTION_EXCLUDE_0_9);
-        OPTIONS.addOption(OPTION_EXCLUDE_0_8);
-        OPTIONS.addOption(OPTION_BIND);
-
-        OPTIONS.addOption(OPTION_JMX_PORT_REGISTRY_SERVER);
-        OPTIONS.addOption(OPTION_JMX_PORT_CONNECTOR_SERVER);
-    }
+    private static final char IPV4_LITERAL_SEPARATOR = '.';
 
-    private CommandLine commandLine;
-
-    public static void main(String[] args)
+    protected static class InitException extends Exception
     {
-        //if the -Dlog4j.configuration property has not been set, enable the init override
-        //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
-        //finds from the classpath when we get the first Loggers
-        if(System.getProperty("log4j.configuration") == null)
+        InitException(String msg, Throwable cause)
         {
-            System.setProperty("log4j.defaultInitOverride", "true");
+            super(msg, cause);
         }
-
-        new Main(args);
     }
 
-    public Main(final String[] args)
+    protected final Options options = new Options();
+    protected CommandLine commandLine;
+
+    protected Main(String[] args)
     {
+        setOptions(options);
         if (parseCommandline(args))
         {
-            try
-            {
-                execute();
-            }
-            catch(Throwable e)
-            {
-                System.err.println("Exception during startup: " + e);
-                e.printStackTrace();
-                shutdown(1);
-            }
+            execute();
         }
     }
 
-    protected boolean parseCommandline(final String[] args)
+    protected boolean parseCommandline(String[] args)
     {
         try
         {
-            commandLine = new PosixParser().parse(OPTIONS, args);
+            commandLine = new PosixParser().parse(options, args);
 
             return true;
         }
@@ -167,129 +110,509 @@ public class Main
         {
             System.err.println("Error: " + e.getMessage());
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp("Qpid", OPTIONS, true);
+            formatter.printHelp("Qpid", options, true);
 
             return false;
         }
     }
 
-    protected void execute() throws Exception
+    @SuppressWarnings("static-access")
+    protected void setOptions(Options options)
+    {
+        Option help = new Option("h", "help", false, "print this message");
+        Option version = new Option("v", "version", false, "print the version information and exit");
+        Option configFile =
+                OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
+                        .create("c");
+        Option port =
+                OptionBuilder.withArgName("port").hasArg()
+                        .withDescription("listen on the specified port. Overrides any value in the config file")
+                        .withLongOpt("port").create("p");
+
+        Option exclude0_10 =
+                OptionBuilder.withArgName("exclude-0-10").hasArg()
+                        .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
+                        .withLongOpt("exclude-0-10").create();
+
+        Option exclude0_9_1 =
+                OptionBuilder.withArgName("exclude-0-9-1").hasArg()
+                        .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
+                        .withLongOpt("exclude-0-9-1").create();
+
+
+        Option exclude0_9 =
+                OptionBuilder.withArgName("exclude-0-9").hasArg()
+                        .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
+                        .withLongOpt("exclude-0-9").create();
+
+
+        Option exclude0_8 =
+                OptionBuilder.withArgName("exclude-0-8").hasArg()
+                        .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
+                        .withLongOpt("exclude-0-8").create();
+
+
+        Option mport =
+                OptionBuilder.withArgName("mport").hasArg()
+                        .withDescription("listen on the specified management port. Overrides any value in the config file")
+                        .withLongOpt("mport").create("m");
+
+
+        Option bind =
+                OptionBuilder.withArgName("bind").hasArg()
+                        .withDescription("bind to the specified address. Overrides any value in the config file")
+                        .withLongOpt("bind").create("b");
+        Option logconfig =
+                OptionBuilder.withArgName("logconfig").hasArg()
+                        .withDescription("use the specified log4j xml configuration file. By "
+                                         + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
+                                         + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+        Option logwatchconfig =
+                OptionBuilder.withArgName("logwatch").hasArg()
+                        .withDescription("monitor the log file configuration file for changes. Units are seconds. "
+                                         + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+
+        options.addOption(help);
+        options.addOption(version);
+        options.addOption(configFile);
+        options.addOption(logconfig);
+        options.addOption(logwatchconfig);
+        options.addOption(port);
+        options.addOption(exclude0_10);
+        options.addOption(exclude0_9_1);
+        options.addOption(exclude0_9);
+        options.addOption(exclude0_8);
+        options.addOption(mport);
+        options.addOption(bind);
+    }
+
+    protected void execute()
     {
-        BrokerOptions options = new BrokerOptions();
-        String configFile = commandLine.getOptionValue(OPTION_CONFIG_FILE.getOpt());
-        if(configFile != null)
+        // note this understands either --help or -h. If an option only has a long name you can use that but if
+        // an option has a short name and a long name you must use the short name here.
+        if (commandLine.hasOption("h"))
         {
-            options.setConfigFile(configFile);
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("Qpid", options, true);
         }
+        else if (commandLine.hasOption("v"))
+        {
+            String ver = QpidProperties.getVersionString();
 
-        String logWatchConfig = commandLine.getOptionValue(OPTION_LOG_WATCH.getOpt());
-        if(logWatchConfig != null)
+            StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
+
+            boolean first = true;
+            for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
+            {
+                if (first)
+                {
+                    first = false;
+                }
+                else
+                {
+                    protocol.append(", ");
+                }
+
+                protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
+
+            }
+
+            System.out.println(ver + " (" + protocol + ")");
+        }
+        else
         {
-            options.setLogWatchFrequency(Integer.parseInt(logWatchConfig));
+            try
+            {
+                CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
+                startup();
+                CurrentActor.remove();
+            }
+            catch (InitException e)
+            {
+                System.out.println("Initialisation Error : " + e.getMessage());
+                shutdown(1);
+            }
+            catch (Throwable e)
+            {
+                System.out.println("Error initialising message broker: " + e);
+                e.printStackTrace();
+                shutdown(1);
+            }
         }
+    }
+
+    protected void shutdown(int status)
+    {
+        ApplicationRegistry.removeAll();
+        System.exit(status);
+    }
 
-        String logConfig = commandLine.getOptionValue(OPTION_LOG_CONFIG_FILE.getOpt());
-        if(logConfig != null)
+    protected void startup() throws Exception
+    {
+        final String QpidHome = System.getProperty(QPID_HOME);
+        final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
+        final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
+        if (!configFile.exists())
+        {
+            String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
+
+            if (QpidHome == null)
+            {
+                error = error + "\nNote: " + QPID_HOME + " is not set.";
+            }
+
+            throw new InitException(error, null);
+        }
+        else
         {
-            options.setLogConfigFile(logConfig);
+            CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
         }
 
-        String jmxPortRegistryServer = commandLine.getOptionValue(OPTION_JMX_PORT_REGISTRY_SERVER.getOpt());
-        if(jmxPortRegistryServer != null)
+        String logConfig = commandLine.getOptionValue("l");
+        String logWatchConfig = commandLine.getOptionValue("w", "0");
+
+        int logWatchTime = 0;
+        try
+        {
+            logWatchTime = Integer.parseInt(logWatchConfig);
+        }
+        catch (NumberFormatException e)
         {
-            options.setJmxPortRegistryServer(Integer.parseInt(jmxPortRegistryServer));
+            System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+                               + "a non-negative integer. Using default of zero (no watching configured");
         }
 
-        String jmxPortConnectorServer = commandLine.getOptionValue(OPTION_JMX_PORT_CONNECTOR_SERVER.getLongOpt());
-        if(jmxPortConnectorServer != null)
+        File logConfigFile;
+        if (logConfig != null)
+        {
+            logConfigFile = new File(logConfig);
+            configureLogging(logConfigFile, logWatchTime);
+        }
+        else
         {
-            options.setJmxPortConnectorServer(Integer.parseInt(jmxPortConnectorServer));
+            File configFileDirectory = configFile.getParentFile();
+            logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
+            configureLogging(logConfigFile, logWatchTime);
         }
 
-        String bindAddr = commandLine.getOptionValue(OPTION_BIND.getOpt());
-        if (bindAddr != null)
+        ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
+        ServerConfiguration serverConfig = config.getConfiguration();
+        updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
+
+        ApplicationRegistry.initialise(config);
+
+        // We have already loaded the BrokerMessages class by this point so we
+        // need to refresh the locale setting incase we had a different value in
+        // the configuration.
+        BrokerMessages.reload();
+
+        // AR.initialise() sets and removes its own actor so we now need to set the actor
+        // for the remainder of the startup, and the default actor if the stack is empty
+        CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
+        CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
+        GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
+        
+
+        try
         {
-            options.setBind(bindAddr);
+            configureLoggingManagementMBean(logConfigFile, logWatchTime);
+
+            ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
+            configMBean.register();
+
+            ServerInformationMBean sysInfoMBean =
+                    new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
+            sysInfoMBean.register();
+
+
+            String[] portStr = commandLine.getOptionValues("p");
+
+            Set<Integer> ports = new HashSet<Integer>();
+            Set<Integer> exclude_0_10 = new HashSet<Integer>();
+            Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
+            Set<Integer> exclude_0_9 = new HashSet<Integer>();
+            Set<Integer> exclude_0_8 = new HashSet<Integer>();
+
+            if(portStr == null || portStr.length == 0)
+            {
+
+                parsePortList(ports, serverConfig.getPorts());
+                parsePortList(exclude_0_10, serverConfig.getPortExclude010());
+                parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
+                parsePortList(exclude_0_9, serverConfig.getPortExclude09());
+                parsePortList(exclude_0_8, serverConfig.getPortExclude08());
+
+            }
+            else
+            {
+                parsePortArray(ports, portStr);
+                parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
+                parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1"));
+                parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
+                parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
+
+            }
+
+
+
+
+            String bindAddr = commandLine.getOptionValue("b");
+            if (bindAddr == null)
+            {
+                bindAddr = serverConfig.getBind();
+            }
+            InetAddress bindAddress = null;
+
+
+
+            if (bindAddr.equals("wildcard"))
+            {
+                bindAddress = new InetSocketAddress(0).getAddress();
+            }
+            else
+            {
+                bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+            }
+
+            String hostName = bindAddress.getCanonicalHostName();
+
+
+            String keystorePath = serverConfig.getKeystorePath();
+            String keystorePassword = serverConfig.getKeystorePassword();
+            String certType = serverConfig.getCertType();
+            SSLContextFactory sslFactory = null;
+
+            if (!serverConfig.getSSLOnly())
+            {
+
+                for(int port : ports)
+                {
+
+                    NetworkDriver driver = new MINANetworkDriver();
+
+                    Set<VERSION> supported = EnumSet.allOf(VERSION.class);
+
+                    if(exclude_0_10.contains(port))
+                    {
+                        supported.remove(VERSION.v0_10);
+                    }
+
+                    if(exclude_0_9_1.contains(port))
+                    {
+                        supported.remove(VERSION.v0_9_1);
+                    }
+                    if(exclude_0_9.contains(port))
+                    {
+                        supported.remove(VERSION.v0_9);
+                    }
+                    if(exclude_0_8.contains(port))
+                    {
+                        supported.remove(VERSION.v0_8);
+                    }
+
+                    MultiVersionProtocolEngineFactory protocolEngineFactory =
+                            new MultiVersionProtocolEngineFactory(hostName, supported);
+
+
+
+                    driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
+                                serverConfig.getNetworkConfiguration(), null);
+                    ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+                                                                  new QpidAcceptor(driver,"TCP"));
+                    CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
+
+                }
+
+            }
+
+            if (serverConfig.getEnableSSL())
+            {
+                sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+                NetworkDriver driver = new MINANetworkDriver();
+                driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+                            new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+                ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()),
+                        new QpidAcceptor(driver,"TCP"));
+                CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort()));
+            }
+
+            CurrentActor.get().message(BrokerMessages.READY());
+
+        }
+        finally
+        {
+            // Startup is complete so remove the AR initialised Startup actor
+            CurrentActor.remove();
         }
 
-        String[] portStr = commandLine.getOptionValues(OPTION_PORT.getOpt());
+
+
+    }
+
+    private void parsePortArray(Set<Integer> ports, String[] portStr)
+            throws InitException
+    {
         if(portStr != null)
         {
-            parsePortArray(options, portStr, false);
-            for(ProtocolExclusion pe : ProtocolExclusion.values())
+            for(int i = 0; i < portStr.length; i++)
             {
-                parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
+                try
+                {
+                    ports.add(Integer.parseInt(portStr[i]));
+                }
+                catch (NumberFormatException e)
+                {
+                    throw new InitException("Invalid port: " + portStr[i], e);
+                }
             }
         }
+    }
 
-        String[] sslPortStr = commandLine.getOptionValues(OPTION_SSLPORT.getOpt());
-        if(sslPortStr != null)
+    private void parsePortList(Set<Integer> output, List input)
+            throws InitException
+    {
+        if(input != null)
         {
-            parsePortArray(options, sslPortStr, true);
-            for(ProtocolExclusion pe : ProtocolExclusion.values())
+            for(Object port : input)
             {
-                parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
+                try
+                {
+                    output.add(Integer.parseInt(String.valueOf(port)));
+                }
+                catch (NumberFormatException e)
+                {
+                    throw new InitException("Invalid port: " + port, e);
+                }
             }
         }
-        
-        startBroker(options);
     }
 
-    protected void startBroker(final BrokerOptions options) throws Exception
+    /**
+     * Update the configuration data with the management port.
+     * @param configuration
+     * @param managementPort The string from the command line
+     */
+    private void updateManagementPort(ServerConfiguration configuration, String managementPort)
     {
-        Broker broker = new Broker();
-        broker.startup(options);
+        if (managementPort != null)
+        {
+            try
+            {
+                configuration.setJMXManagementPort(Integer.parseInt(managementPort));
+            }
+            catch (NumberFormatException e)
+            {
+                _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
+            }
+        }
     }
 
-    protected void shutdown(final int status)
+    public static void main(String[] args)
     {
-        ApplicationRegistry.remove();
-        System.exit(status);
+        //if the -Dlog4j.configuration property has not been set, enable the init override
+        //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
+        //finds from the classpath when we get the first Loggers
+        if(System.getProperty("log4j.configuration") == null)
+        {
+            System.setProperty("log4j.defaultInitOverride", "true");
+        }
+
+        //now that the override status is know, we can instantiate the Loggers
+        _logger = Logger.getLogger(Main.class);
+
+        new Main(args);
     }
 
-    private static void parsePortArray(final BrokerOptions options,final Object[] ports,
-                                       final boolean ssl) throws InitException
+    private byte[] parseIP(String address) throws Exception
     {
-        if(ports != null)
+        char[] literalBuffer = address.toCharArray();
+        int byteCount = 0;
+        int currByte = 0;
+        byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
+        for (int i = 0; i < literalBuffer.length; i++)
         {
-            for(int i = 0; i < ports.length; i++)
+            char currChar = literalBuffer[i];
+            if ((currChar >= '0') && (currChar <= '9'))
+            {
+                currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF);
+            }
+
+            if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length))
             {
+                ip[byteCount++] = (byte) currByte;
+                currByte = 0;
+            }
+        }
+
+        if (byteCount != 4)
+        {
+            throw new Exception("Invalid IP address: " + address);
+        }
+        return ip;
+    }
+
+    private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
+    {
+        if (logConfigFile.exists() && logConfigFile.canRead())
+        {
+            CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
+
+            if (logWatchTime > 0)
+            {
+                System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+                                   + logWatchTime + " seconds");
+                // log4j expects the watch interval in milliseconds
                 try
                 {
-                    if(ssl)
-                    {
-                        options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
-                    }
-                    else
-                    {
-                        options.addPort(Integer.parseInt(String.valueOf(ports[i])));
-                    }
+                    QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
                 }
-                catch (NumberFormatException e)
+                catch (Exception e)
                 {
-                    throw new InitException("Invalid port: " + ports[i], e);
+                    throw new InitException(e.getMessage(),e);
+                }
+            }
+            else
+            {
+                try
+                {
+                    QpidLog4JConfigurator.configure(logConfigFile.getPath());
+                }
+                catch (Exception e)
+                {
+                    throw new InitException(e.getMessage(),e);
                 }
             }
         }
-    }
-
-    private static void parsePortArray(final BrokerOptions options, final Object[] ports,
-                                       final ProtocolExclusion excludedProtocol) throws InitException
-    {
-        if(ports != null)
+        else
         {
-            for(int i = 0; i < ports.length; i++)
+            System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
+            System.err.println("Using the fallback internal log4j.properties configuration");
+
+            InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
+            if(propsFile == null)
+            {
+                throw new IOException("Unable to load the fallback internal log4j.properties configuration file");
+            }
+            else
             {
                 try
                 {
-                    options.addExcludedPort(excludedProtocol, 
-                            Integer.parseInt(String.valueOf(ports[i])));
+                    Properties fallbackProps = new Properties();
+                    fallbackProps.load(propsFile);
+                    PropertyConfigurator.configure(fallbackProps);
                 }
-                catch (NumberFormatException e)
+                finally
                 {
-                    throw new InitException("Invalid port for exclusion: " + ports[i], e);
+                    propsFile.close();
                 }
             }
         }
     }
+
+    private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
+    {
+        LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
+
+        blm.register();
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Oct 21 01:19:00 2011
@@ -20,8 +20,6 @@
 
 package org.apache.qpid.server.configuration;
 
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
 import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,28 +37,33 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.SystemConfiguration;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.signal.SignalHandlerTask;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
 
-public class ServerConfiguration extends ConfigurationPlugin
+public class ServerConfiguration extends ConfigurationPlugin implements SignalHandler
 {
     protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class);
 
     // Default Configuration values
-    public static final int DEFAULT_BUFFER_SIZE = 262144;
+    public static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144;
+    public static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144;
+    public static final boolean DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED = false;
     public static final String DEFAULT_STATUS_UPDATES = "on";
     public static final String SECURITY_CONFIG_RELOADED = "SECURITY CONFIGURATION RELOADED";
 
     public static final int DEFAULT_FRAME_SIZE = 65536;
     public static final int DEFAULT_PORT = 5672;
-    public static final int DEFAULT_SSL_PORT = 5671;
+    public static final int DEFAULT_SSL_PORT = 8672;
     public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
-    public static final int DEFAULT_JMXPORT_REGISTRYSERVER = 8999;
-    public static final int JMXPORT_CONNECTORSERVER_OFFSET = 100;
-
+    public static final int DEFAULT_JMXPORT = 8999;
+    
     public static final String QPID_HOME = "QPID_HOME";
     public static final String QPID_WORK = "QPID_WORK";
     public static final String LIB_DIR = "lib";
@@ -72,14 +75,19 @@ public class ServerConfiguration extends
     private File _configFile;
     private File _vhostsFile;
 
+    private Logger _log = Logger.getLogger(this.getClass());
+
+    private ConfigurationManagementMBean _mbean;
+
     // Map of environment variables to config items
     private static final Map<String, String> envVarMap = new HashMap<String, String>();
 
     // Configuration values to be read from the configuration file
     //todo Move all properties to static values to ensure system testing can be performed.
+    public static final String CONNECTOR_PROTECTIO_ENABLED = "connector.protectio.enabled";
+    public static final String CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE = "connector.protectio.readBufferLimitSize";
+    public static final String CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE = "connector.protectio.writeBufferLimitSize";
     public static final String MGMT_CUSTOM_REGISTRY_SOCKET = "management.custom-registry-socket";
-    public static final String MGMT_JMXPORT_REGISTRYSERVER = "management.jmxport.registryServer";
-    public static final String MGMT_JMXPORT_CONNECTORSERVER = "management.jmxport.connectorServer";
     public static final String STATUS_UPDATES = "status-updates";
     public static final String ADVANCED_LOCALE = "advanced.locale";
 
@@ -87,9 +95,9 @@ public class ServerConfiguration extends
         envVarMap.put("QPID_PORT", "connector.port");
         envVarMap.put("QPID_ENABLEDIRECTBUFFERS", "advanced.enableDirectBuffers");
         envVarMap.put("QPID_SSLPORT", "connector.ssl.port");
+        envVarMap.put("QPID_NIO", "connector.qpidnio");
         envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool");
-        envVarMap.put("QPID_JMXPORT_REGISTRYSERVER", MGMT_JMXPORT_REGISTRYSERVER);
-        envVarMap.put("QPID_JMXPORT_CONNECTORSERVER", MGMT_JMXPORT_CONNECTORSERVER);
+        envVarMap.put("QPID_JMXPORT", "management.jmxport");
         envVarMap.put("QPID_FRAMESIZE", "advanced.framesize");
         envVarMap.put("QPID_MSGAUTH", "security.msg-auth");
         envVarMap.put("QPID_AUTOREGISTER", "auto_register");
@@ -123,7 +131,7 @@ public class ServerConfiguration extends
      * Configuration Manager to be initialised in the Application Registry.
      * <p>
      * If using this ServerConfiguration via an ApplicationRegistry there is no
-     * need to explicitly call {@link #initialise()} as this is done via the
+     * need to explictly call {@link #initialise()} as this is done via the
      * {@link ApplicationRegistry#initialise()} method.
      *
      * @param configurationURL
@@ -133,26 +141,15 @@ public class ServerConfiguration extends
     {
         this(parseConfig(configurationURL));
         _configFile = configurationURL;
-
-        SignalHandlerTask hupReparseTask = new SignalHandlerTask()
+        try
         {
-            public void handle()
-            {
-                try
-                {
-                    reparseConfigFileSecuritySections();
-                }
-                catch (ConfigurationException e)
-                {
-                    _logger.error("Could not reload configuration file security sections", e);
-                }
-            }
-        };
-
-        if(!hupReparseTask.register("HUP"))
+            Signal sig = new sun.misc.Signal("HUP");
+            sun.misc.Signal.handle(sig, this);
+        }
+        catch (Exception e)
         {
-            _logger.info("Unable to register Signal HUP handler to reload security configuration.");
-            _logger.info("Signal HUP not supported for this OS / JVM combination - " + SignalHandlerTask.getPlatformDescription());
+            _logger.error("Signal HUP not supported for OS: " + System.getProperty("os.name"));
+            // We're on something that doesn't handle SIGHUP, how sad, Windows.
         }
     }
 
@@ -169,7 +166,7 @@ public class ServerConfiguration extends
      * Configuration Manager to be initialised in the Application Registry.
      * <p>
      * If using this ServerConfiguration via an ApplicationRegistry there is no 
-     * need to explicitly call {@link #initialise()} as this is done via the
+     * need to explictly call {@link #initialise()} as this is done via the
      * {@link ApplicationRegistry#initialise()} method.
      *
      * @param conf
@@ -208,53 +205,7 @@ public class ServerConfiguration extends
     @Override
     public void validateConfiguration() throws ConfigurationException
     {
-        // Support for security.jmx.access was removed when JMX access rights were incorporated into the main ACL.
-        // This ensure that users remove the element from their configuration file.
-        
-        if (getListValue("security.jmx.access").size() > 0)
-        {
-            String message = "Validation error : security/jmx/access is no longer a supported element within the configuration xml." 
-                    + (_configFile == null ? "" : " Configuration file : " + _configFile);
-            throw new ConfigurationException(message);
-        }
-
-        if (getListValue("security.jmx.principal-database").size() > 0)
-        {
-            String message = "Validation error : security/jmx/principal-database is no longer a supported element within the configuration xml."
-                    + (_configFile == null ? "" : " Configuration file : " + _configFile);
-            throw new ConfigurationException(message);
-        }
-
-        if (getListValue("security.principal-databases.principal-database(0).class").size() > 0)
-        {
-            String message = "Validation error : security/principal-databases is no longer supported within the configuration xml." 
-                    + (_configFile == null ? "" : " Configuration file : " + _configFile);
-            throw new ConfigurationException(message);
-        }
-
-        // QPID-3266.  Tidy up housekeeping configuration option for scheduling frequency
-        if (contains("housekeeping.expiredMessageCheckPeriod"))
-        {
-            String message = "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod."
-                    + (_configFile == null ? "" : " Configuration file : " + _configFile);
-            throw new ConfigurationException(message);
-        }
-
-        // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
-        // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
-        for (String key : new String[] {"management.ssl.keystorePath",
-                "management.ssl.keystorePassword," +
-                "connector.ssl.keystorePath",
-                "connector.ssl.keystorePassword"})
-        {
-            if (contains(key))
-            {
-                final String deprecatedXpath = key.replaceAll("\\.", "/");
-                final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore");
-                _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath
-                        + (_configFile == null ? "" : " Configuration file : " + _configFile));
-            }
-        }
+        //Currently doesn't do validation
     }
 
     /*
@@ -420,7 +371,7 @@ public class ServerConfiguration extends
     public final static Configuration flatConfig(File file) throws ConfigurationException
     {
         // We have to override the interpolate methods so that
-        // interpolation takes place across the entirety of the
+        // interpolation takes place accross the entirety of the
         // composite configuration. Without doing this each
         // configuration object only interpolates variables defined
         // inside itself.
@@ -447,6 +398,18 @@ public class ServerConfiguration extends
         return _configFile == null ? "" : _configFile.getAbsolutePath();
     }
 
+    public void handle(Signal arg0)
+    {
+        try
+        {
+            reparseConfigFileSecuritySections();
+        }
+        catch (ConfigurationException e)
+        {
+             _logger.error("Could not reload configuration file security sections", e);
+        }
+    }
+
     public void reparseConfigFileSecuritySections() throws ConfigurationException
     {
         if (_configFile != null)
@@ -490,24 +453,14 @@ public class ServerConfiguration extends
         return System.getProperty(QPID_HOME);
     }
 
-    public void setJMXPortRegistryServer(int registryServerPort)
-    {
-        getConfig().setProperty(MGMT_JMXPORT_REGISTRYSERVER, registryServerPort);
-    }
-
-    public int getJMXPortRegistryServer()
+    public void setJMXManagementPort(int mport)
     {
-        return getIntValue(MGMT_JMXPORT_REGISTRYSERVER, DEFAULT_JMXPORT_REGISTRYSERVER);
+        getConfig().setProperty("management.jmxport", mport);
     }
 
-    public void setJMXPortConnectorServer(int connectorServerPort)
+    public int getJMXManagementPort()
     {
-        getConfig().setProperty(MGMT_JMXPORT_CONNECTORSERVER, connectorServerPort);
-    }
-
-    public int getJMXConnectorServerPort()
-    {
-        return getIntValue(MGMT_JMXPORT_CONNECTORSERVER, getJMXPortRegistryServer() + JMXPORT_CONNECTORSERVER_OFFSET);
+        return getIntValue("management.jmxport", DEFAULT_JMXPORT);
     }
 
     public boolean getUseCustomRMISocketFactory()
@@ -550,11 +503,58 @@ public class ServerConfiguration extends
         _virtualHosts.put(config.getName(), config);
     }
 
+    public List<String> getPrincipalDatabaseNames()
+    {
+        return getListValue("security.principal-databases.principal-database.name");
+    }
+
+    public List<String> getPrincipalDatabaseClass()
+    {
+        return getListValue("security.principal-databases.principal-database.class");
+    }
+
+    public List<String> getPrincipalDatabaseAttributeNames(int index)
+    {
+        String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.name";
+        return getListValue(name);
+    }
+
+    public List<String> getPrincipalDatabaseAttributeValues(int index)
+    {
+        String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.value";
+        return getListValue(name);
+    }
+
+    public List<String> getManagementPrincipalDBs()
+    {
+        return getListValue("security.jmx.principal-database");
+    }
+
+    public List<String> getManagementAccessList()
+    {
+        return getListValue("security.jmx.access");
+    }
+
     public int getFrameSize()
     {
         return getIntValue("advanced.framesize", DEFAULT_FRAME_SIZE);
     }
 
+    public boolean getProtectIOEnabled()
+    {
+        return getBooleanValue(CONNECTOR_PROTECTIO_ENABLED, DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED);
+    }
+
+    public int getBufferReadLimit()
+    {
+        return getIntValue(CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_READ_LIMIT_SIZE);
+    }
+
+    public int getBufferWriteLimit()
+    {
+        return getIntValue(CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_WRITE_LIMIT_SIZE);
+    }
+
     public boolean getSynchedClocks()
     {
         return getBooleanValue("advanced.synced-clocks");
@@ -565,10 +565,14 @@ public class ServerConfiguration extends
         return getBooleanValue("security.msg-auth");
     }
 
+    public String getJMXPrincipalDatabase()
+    {
+        return getStringValue("security.jmx.principal-database");
+    }
+
     public String getManagementKeyStorePath()
     {
-        final String fallback = getStringValue("management.ssl.keystorePath");
-        return getStringValue("management.ssl.keyStorePath", fallback);
+        return getStringValue("management.ssl.keyStorePath");
     }
 
     public boolean getManagementSSLEnabled()
@@ -578,8 +582,7 @@ public class ServerConfiguration extends
 
     public String getManagementKeyStorePassword()
     {
-        final String fallback = getStringValue("management.ssl.keystorePassword");
-        return getStringValue("management.ssl.keyStorePassword", fallback);
+        return getStringValue("management.ssl.keyStorePassword");
     }
 
     public boolean getQueueAutoRegister()
@@ -647,14 +650,14 @@ public class ServerConfiguration extends
         return getLongValue("flowResumeCapacity", getCapacity());
     }
 
-    public int getConnectorProcessors()
+    public int getProcessors()
     {
         return getIntValue("connector.processors", 4);
     }
 
     public List getPorts()
     {
-        return getListValue("connector.port", Collections.<Integer>singletonList(DEFAULT_PORT));
+        return getListValue("connector.port", Collections.singletonList(DEFAULT_PORT));
     }
 
     public List getPortExclude010()
@@ -679,17 +682,17 @@ public class ServerConfiguration extends
 
     public String getBind()
     {
-        return getStringValue("connector.bind", WILDCARD_ADDRESS);
+        return getStringValue("connector.bind", "wildcard");
     }
 
     public int getReceiveBufferSize()
     {
-        return getIntValue("connector.socketReceiveBuffer", DEFAULT_BUFFER_SIZE);
+        return getIntValue("connector.socketReceiveBuffer", 32767);
     }
 
     public int getWriteBufferSize()
     {
-        return getIntValue("connector.socketWriteBuffer", DEFAULT_BUFFER_SIZE);
+        return getIntValue("connector.socketWriteBuffer", 32767);
     }
 
     public boolean getTcpNoDelay()
@@ -712,28 +715,31 @@ public class ServerConfiguration extends
         return getBooleanValue("connector.ssl.sslOnly");
     }
 
-    public List getSSLPorts()
+    public int getSSLPort()
     {
-        return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT));
+        return getIntValue("connector.ssl.port", DEFAULT_SSL_PORT);
     }
 
-    public String getConnectorKeyStorePath()
+    public String getKeystorePath()
     {
-        final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name.
-        return getStringValue("connector.ssl.keyStorePath", fallback);
+        return getStringValue("connector.ssl.keystorePath", "none");
     }
 
-    public String getConnectorKeyStorePassword()
+    public String getKeystorePassword()
     {
-        final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name.
-        return getStringValue("connector.ssl.keyStorePassword", fallback);
+        return getStringValue("connector.ssl.keystorePassword", "none");
     }
 
-    public String getConnectorCertType()
+    public String getCertType()
     {
         return getStringValue("connector.ssl.certType", "SunX509");
     }
 
+    public boolean getQpidNIO()
+    {
+        return getBooleanValue("connector.qpidnio");
+    }
+
     public boolean getUseBiasedWrites()
     {
         return getBooleanValue("advanced.useWriteBiasedPool");
@@ -749,44 +755,69 @@ public class ServerConfiguration extends
          getConfig().setProperty("virtualhosts.default", vhost);
     }    
 
-    public void setHousekeepingCheckPeriod(long value)
+    public void setHousekeepingExpiredMessageCheckPeriod(long value)
     {
-        getConfig().setProperty("housekeeping.checkPeriod", value);
+        getConfig().setProperty("housekeeping.expiredMessageCheckPeriod", value);
     }
 
     public long getHousekeepingCheckPeriod()
     {
-        return getLongValue("housekeeping.checkPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
+        return getLongValue("housekeeping.checkPeriod",
+                                   getLongValue("housekeeping.expiredMessageCheckPeriod",
+                                                       DEFAULT_HOUSEKEEPING_PERIOD));
     }
 
-    public long getStatisticsSamplePeriod()
+    public NetworkDriverConfiguration getNetworkConfiguration()
     {
-        return getConfig().getLong("statistics.sample.period", 5000L);
-    }
+        return new NetworkDriverConfiguration()
+        {
 
-    public boolean isStatisticsGenerationBrokerEnabled()
-    {
-        return getConfig().getBoolean("statistics.generation.broker", false);
-    }
+            public Integer getTrafficClass()
+            {
+                return null;
+            }
 
-    public boolean isStatisticsGenerationVirtualhostsEnabled()
-    {
-        return getConfig().getBoolean("statistics.generation.virtualhosts", false);
-    }
+            public Boolean getTcpNoDelay()
+            {
+                // Can't call parent getTcpNoDelay since it just calls this one
+                return getBooleanValue("connector.tcpNoDelay", true);
+            }
 
-    public boolean isStatisticsGenerationConnectionsEnabled()
-    {
-        return getConfig().getBoolean("statistics.generation.connections", false);
-    }
+            public Integer getSoTimeout()
+            {
+                return null;
+            }
 
-    public long getStatisticsReportingPeriod()
-    {
-        return getConfig().getLong("statistics.reporting.period", 0L);
-    }
+            public Integer getSoLinger()
+            {
+                return null;
+            }
 
-    public boolean isStatisticsReportResetEnabled()
-    {
-        return getConfig().getBoolean("statistics.reporting.reset", false);
+            public Integer getSendBufferSize()
+            {
+                return getBufferWriteLimit();
+            }
+
+            public Boolean getReuseAddress()
+            {
+                return null;
+            }
+
+            public Integer getReceiveBufferSize()
+            {
+                return getBufferReadLimit();
+            }
+
+            public Boolean getOOBInline()
+            {
+                return null;
+            }
+
+            public Boolean getKeepAlive()
+            {
+                return null;
+            }
+        };
     }
 
     public int getMaxChannelCount()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Fri Oct 21 01:19:00 2011
@@ -86,9 +86,9 @@ public class VirtualHostConfiguration ex
         return _name;
     }
 
-    public long getHousekeepingCheckPeriod()
+    public long getHousekeepingExpiredMessageCheckPeriod()
     {
-        return getLongValue("housekeeping.checkPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
+        return getLongValue("housekeeping.expiredMessageCheckPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
     }
 
     public String getAuthenticationDatabase()
@@ -306,45 +306,11 @@ public class VirtualHostConfiguration ex
     @Override
     public void validateConfiguration() throws ConfigurationException
     {
-        // QPID-3249.  Support for specifying authentication name at vhost level is no longer supported.
-        if (getListValue("security.authentication.name").size() > 0)
-        {
-            String message = "Validation error : security/authentication/name is no longer a supported element within the configuration xml."
-                    + " It appears in virtual host definition : " + _name;
-            throw new ConfigurationException(message);
-        }
-
-        // QPID-3266.  Tidy up housekeeping configuration option for scheduling frequency
-        if (contains("housekeeping.expiredMessageCheckPeriod"))
-        {
-            String message = "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod."
-                    + " It appears in virtual host definition : " + _name;
-            throw new ConfigurationException(message);
-        }
+        //Currently doesn't do validation
     }
 
     public int getHouseKeepingThreadCount()
     {
         return getIntValue("housekeeping.poolSize", Runtime.getRuntime().availableProcessors());
     }
-
-    public long getTransactionTimeoutOpenWarn()
-    {
-        return getLongValue("transactionTimeout.openWarn", 0L);
-    }
-
-    public long getTransactionTimeoutOpenClose()
-    {
-        return getLongValue("transactionTimeout.openClose", 0L);
-    }
-
-    public long getTransactionTimeoutIdleWarn()
-    {
-        return getLongValue("transactionTimeout.idleWarn", 0L);
-    }
-
-    public long getTransactionTimeoutIdleClose()
-    {
-        return getLongValue("transactionTimeout.idleClose", 0L);
-    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@ import org.apache.commons.configuration.
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.ConfigurationManager;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -139,28 +138,10 @@ public abstract class ConfigurationPlugi
             }
         }
 
-        offerRemainingConfigurationToOtherPlugins(path, configuration, elements);
-
-        validateConfiguration();
-    }
-
-    private void offerRemainingConfigurationToOtherPlugins(String path,
-            Configuration configuration, Set<String> elements) throws ConfigurationException
-    {
-        final IApplicationRegistry appRegistry = safeGetApplicationRegistryInstance();
-
-        if (appRegistry == null)
-        {
-            // We see this happen during shutdown due to asynchronous reconfig using IO threads.
-            // Need to remove the responsibility for offering configuration to other class.
-            _logger.info("Cannot offer remaining config to other plugins, can't find app registry");
-            return;
-        }
-
-        final ConfigurationManager configurationManager = appRegistry.getConfigurationManager();
         // Process the elements in the configuration
         for (String element : elements)
         {
+            ConfigurationManager configurationManager = ApplicationRegistry.getInstance().getConfigurationManager();
             Configuration handled = element.length() == 0 ? configuration : configuration.subset(element);
 
             String configurationElement = element;
@@ -181,18 +162,8 @@ public abstract class ConfigurationPlugi
                 _pluginConfiguration.put(plugin.getClass().getName(), plugin);
             }
         }
-    }
 
-    private IApplicationRegistry safeGetApplicationRegistryInstance()
-    {
-        try
-        {
-            return ApplicationRegistry.getInstance();
-        }
-        catch (IllegalStateException ise)
-        {
-            return null;
-        }
+        validateConfiguration();
     }
 
     /** Helper method to print out list of keys in a {@link Configuration}. */

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Fri Oct 21 01:19:00 2011
@@ -20,21 +20,19 @@
  */
 package org.apache.qpid.server.connection;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQProtocolEngine;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 public class ConnectionRegistry implements IConnectionRegistry, Closeable
 {
-    private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
+    private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>();
 
     private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
 
@@ -42,46 +40,44 @@ public class ConnectionRegistry implemen
     {
         // None required
     }
-
-    /** Close all of the currently open connections. */
-    public void close()
+    
+    public void expireClosedChannels()
     {
-        _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
-        while (!_registry.isEmpty())
+        for (AMQProtocolSession connection : _registry)
         {
-            AMQConnectionModel connection = _registry.get(0);
-            closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down");
+            connection.closeIfLingeringClosedChannels();
         }
     }
 
-    public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
+    /** Close all of the currently open connections. */
+    public void close()
     {
-        try
-        {
-            connection.close(cause, message);
-        }
-        catch (TransportException e)
-        {
-            _logger.warn("Error closing connection:" + e.getMessage());
-        }
-        catch (AMQException e)
+        while (!_registry.isEmpty())
         {
-            _logger.warn("Error closing connection:" + e.getMessage());
+            AMQProtocolSession connection = _registry.get(0);
+
+            try
+            {
+                connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down",
+                                                                         0, 0,
+                                                                         connection.getProtocolOutputConverter().getProtocolMajorVersion(),
+                                                                         connection.getProtocolOutputConverter().getProtocolMinorVersion(),
+                                                                         (Throwable) null), true);
+            }
+            catch (AMQException e)
+            {
+                _logger.warn("Error closing connection:" + e.getMessage());
+            }
         }
     }
 
-    public void registerConnection(AMQConnectionModel connnection)
+    public void registerConnection(AMQProtocolSession connnection)
     {
         _registry.add(connnection);
     }
 
-    public void deregisterConnection(AMQConnectionModel connnection)
+    public void deregisterConnection(AMQProtocolSession connnection)
     {
         _registry.remove(connnection);
     }
-
-    public List<AMQConnectionModel> getConnections()
-    {
-        return new ArrayList<AMQConnectionModel>(_registry);
-    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Fri Oct 21 01:19:00 2011
@@ -20,23 +20,18 @@
  */
 package org.apache.qpid.server.connection;
 
-import java.util.List;
-
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 
 public interface IConnectionRegistry
 {
+
     public void initialise();
 
     public void close() throws AMQException;
-    
-    public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
-    
-    public List<AMQConnectionModel> getConnections();
 
-    public void registerConnection(AMQConnectionModel connnection);
+    public void registerConnection(AMQProtocolSession connnection);
+
+    public void deregisterConnection(AMQProtocolSession connnection);
 
-    public void deregisterConnection(AMQConnectionModel connnection);
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Oct 21 01:19:00 2011
@@ -356,7 +356,7 @@ public abstract class AbstractExchange i
         _receivedMessageCount.incrementAndGet();
         _receivedMessageSize.addAndGet(message.getSize());
         final ArrayList<? extends BaseQueue> queues = doRoute(message);
-        if(!queues.isEmpty())
+        if(queues != null && !queues.isEmpty())
         {
             _routedMessageCount.incrementAndGet();
             _routedMessageSize.addAndGet(message.getSize());

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java Fri Oct 21 01:19:00 2011
@@ -90,12 +90,12 @@ public abstract class AbstractExchangeMB
 
     public String getObjectInstanceName()
     {
-        return ObjectName.quote(_exchange.getName());
+        return _exchange.getNameShortString().toString();
     }
 
     public String getName()
     {
-        return _exchange.getName();
+        return _exchange.getNameShortString().toString();
     }
 
     public String getExchangeType()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Fri Oct 21 01:19:00 2011
@@ -30,12 +30,15 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.ExchangeConfig;
 
 import javax.management.JMException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public interface Exchange extends ExchangeReferrer, ExchangeConfig
 {
@@ -64,12 +67,7 @@ public interface Exchange extends Exchan
 
     void close() throws AMQException;
 
-    /**
-     * Returns a list of queues to which to route this message.   If there are
-     * no queues the empty list must be returned.
-     *
-     * @return list of queues to which to route the message.
-     */
+
     ArrayList<? extends BaseQueue> route(InboundMessage message);
 
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java Fri Oct 21 01:19:00 2011
@@ -274,6 +274,132 @@ public class HeadersParser
         
     }
 
+    public static void main(String[] args) throws AMQFrameDecodingException
+    {        
+
+        FieldTable bindingTable = new FieldTable();
+
+        bindingTable.setString(new AMQShortString("x-match"),"all");
+        bindingTable.setInteger("a",1);
+        bindingTable.setVoid(new AMQShortString("b"));
+        bindingTable.setString("c","");
+        bindingTable.setInteger("d",4);
+        bindingTable.setInteger("e",1);
+
+
+
+        FieldTable bindingTable2 = new FieldTable();
+        bindingTable2.setString(new AMQShortString("x-match"),"all");
+        bindingTable2.setInteger("a",1);
+        bindingTable2.setVoid(new AMQShortString("b"));
+        bindingTable2.setString("c","");
+        bindingTable2.setInteger("d",4);
+        bindingTable2.setInteger("e",1);
+        bindingTable2.setInteger("f",1);
+
+
+        FieldTable table = new FieldTable();
+        table.setInteger("a",1);
+        table.setInteger("b",2);
+        table.setString("c","");
+        table.setInteger("d",4);
+        table.setInteger("e",1);
+        table.setInteger("f",1);
+        table.setInteger("h",1);
+        table.setInteger("i",1);
+        table.setInteger("j",1);
+        table.setInteger("k",1);
+        table.setInteger("l",1);
+
+        org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.allocate( (int) table.getEncodedSize());
+        EncodingUtils.writeFieldTableBytes(buffer, table);
+        buffer.flip();
+
+        FieldTable table2 = EncodingUtils.readFieldTable(buffer);
+
+
+
+        FieldTable bindingTable3 = new FieldTable();
+        bindingTable3.setString(new AMQShortString("x-match"),"any");
+        bindingTable3.setInteger("a",1);
+        bindingTable3.setInteger("b",3);
+
+
+        FieldTable bindingTable4 = new FieldTable();
+        bindingTable4.setString(new AMQShortString("x-match"),"any");
+        bindingTable4.setVoid(new AMQShortString("a"));
+
+
+        FieldTable bindingTable5 = new FieldTable();
+        bindingTable5.setString(new AMQShortString("x-match"),"all");
+        bindingTable5.setString(new AMQShortString("h"),"hello");
+
+        for(int i = 0; i < 100; i++)
+        {
+            printMatches(new FieldTable[] {bindingTable5} , table2);
+        }
+
+
+
+    }
+
+
+
+    private static void printMatches(final FieldTable[] bindingKeys, final FieldTable routingKey)
+    {
+        HeadersMatcherDFAState sm = null;
+        Map<HeaderMatcherResult, String> resultMap = new HashMap<HeaderMatcherResult, String>();
+
+        HeadersParser parser = new HeadersParser();
+
+        for(int i = 0; i < bindingKeys.length; i++)
+        {
+            HeaderMatcherResult r = new HeaderMatcherResult();
+            resultMap.put(r, bindingKeys[i].toString());
+
+
+            if(i==0)
+            {
+                sm = parser.createStateMachine(bindingKeys[i], r);
+            }
+            else
+            {
+                sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeys[i], r));
+            }
+        }
+
+        Collection<HeaderMatcherResult> results = null;
+        long beforeTime = System.currentTimeMillis();
+        for(int i = 0; i < 1000000; i++)
+        {
+            routingKey.size();
+
+            assert sm != null;
+            results = sm.match(routingKey);
+
+        }
+        long elapsed = System.currentTimeMillis() - beforeTime;
+        System.out.println("1000000 Iterations took: " + elapsed);
+        Collection<String> resultStrings = new ArrayList<String>();
+
+        assert results != null;
+        for(HeaderMatcherResult result : results)
+        {
+            resultStrings.add(resultMap.get(result));
+        }
+
+        final ArrayList<String> nonMatches = new ArrayList<String>();
+        for(FieldTable key : bindingKeys)
+        {
+            nonMatches.add(key.toString());
+        }
+        nonMatches.removeAll(resultStrings);
+        System.out.println("\""+routingKey+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
+
+
+    }
+
+
     public final static class KeyValuePair
     {
         public final HeaderKey _key;

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java Fri Oct 21 01:19:00 2011
@@ -38,7 +38,6 @@ import org.apache.qpid.server.queue.Base
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.transport.ServerSession;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -697,7 +696,7 @@ public class Bridge implements BridgeCon
 
             //TODO Handle the passing of non-null Filters and Arguments here
             
-            Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+            Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
                                                           _destination,
                                                           MessageAcceptMode.NONE,
                                                           MessageAcquireMode.PRE_ACQUIRED,
@@ -769,7 +768,7 @@ public class Bridge implements BridgeCon
 
           //TODO Handle the passing of non-null Filters and Arguments here
             
-            Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+            Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
                                                           _destination,
                                                           MessageAcceptMode.NONE,
                                                           MessageAcquireMode.PRE_ACQUIRED,

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java Fri Oct 21 01:19:00 2011
@@ -258,6 +258,7 @@ public class BrokerLink implements LinkC
                     _remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString();
                 }
                 _qpidConnection.setSessionFactory(new SessionFactory());
+                _qpidConnection.setAuthorizationID(_username == null ? "" : _username);
 
                 updateState(State.ESTABLISHING, State.OPERATIONAL);
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Fri Oct 21 01:19:00 2011
@@ -37,8 +37,8 @@ import org.apache.qpid.server.queue.Filt
 public class PropertyExpression implements Expression
 {
     // Constants - defined the same as JMS
-    private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT }
-
+    private static final int NON_PERSISTENT = 1;
+    private static final int PERSISTENT = 2;
     private static final int DEFAULT_PRIORITY = 4;
 
     private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
@@ -172,14 +172,13 @@ public class PropertyExpression implemen
     {
         public Object evaluate(Filterable message)
         {
-                JMSDeliveryMode mode = message.isPersistent() ? JMSDeliveryMode.PERSISTENT :
-                                                                JMSDeliveryMode.NON_PERSISTENT;
+                int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("JMSDeliveryMode is :" + mode);
                 }
 
-                return mode.toString();
+                return mode;
         }
     }
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -144,7 +144,7 @@ public class BasicConsumeMethodHandler i
                     _logger.debug("Closing connection due to invalid selector");
 
                     MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
-                    AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
+                    AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
                                                                                        new AMQShortString(ise.getMessage()),
                                                                                        body.getClazz(),
                                                                                        body.getMethod());

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -162,7 +162,14 @@ public class BasicGetMethodHandler imple
         }
         else
         {
-            sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+            sub = new GetNoAckSubscription(channel,
+                                                 session,
+                                                 null,
+                                                 null,
+                                                 false,
+                                                 singleMessageCredit,
+                                                 getDeliveryMethod,
+                                                 getRecordMethod);
         }
 
         queue.registerSubscription(sub,false);
@@ -173,5 +180,27 @@ public class BasicGetMethodHandler imple
 
     }
 
+    public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+    {
+        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+                               AMQShortString consumerTag, FieldTable filters,
+                               boolean noLocal, FlowCreditManager creditManager,
+                                   ClientDeliveryMethod deliveryMethod,
+                                   RecordDeliveryMethod recordMethod)
+            throws AMQException
+        {
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+        }
+
+        public boolean isTransient()
+        {
+            return true;
+        }
 
+        public boolean wouldSuspend(QueueEntry msg)
+        {
+            return !getCreditManager().useCreditForMessage(msg.getMessage());
+        }
+
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -68,7 +68,5 @@ public class ConnectionCloseMethodHandle
         ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
         session.writeFrame(responseBody.generateFrame(channelId));
 
-        session.closeProtocolSession();
-
     }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,9 @@
  */
 package org.apache.qpid.server.handler;
 
-
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ConnectionCloseBody;
@@ -68,7 +68,7 @@ public class ConnectionSecureOkMethodHan
         }
         MethodRegistry methodRegistry = session.getMethodRegistry();
         AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
-        switch (authResult.getStatus())
+        switch (authResult.status)
         {
             case ERROR:
                 Exception cause = authResult.getCause();
@@ -88,10 +88,7 @@ public class ConnectionSecureOkMethodHan
                 disposeSaslServer(session);
                 break;
             case SUCCESS:
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Connected as: " + UsernamePrincipal.getUsernamePrincipalFromSubject(authResult.getSubject()));
-                }
+                _logger.info("Connected as: " + ss.getAuthorizationID());
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
 
                 ConnectionTuneBody tuneBody =
@@ -99,13 +96,13 @@ public class ConnectionSecureOkMethodHan
                                                                 ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
                                                                 ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
                 session.writeFrame(tuneBody.generateFrame(0));
-                session.setAuthorizedSubject(authResult.getSubject());
+                session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
                 disposeSaslServer(session);                
                 break;
             case CONTINUE:
                 stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
 
-                ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+                ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
                 session.writeFrame(secureBody.generateFrame(0));
         }
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org