You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 22:08:13 UTC

svn commit: r1157632 [1/2] - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/protocol/ main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/security/auth/manager/ main/java/org...

Author: rgodfrey
Date: Sun Aug 14 20:08:12 2011
New Revision: 1157632

URL: http://svn.apache.org/viewvc?rev=1157632&view=rev
Log:
Undoing commits of stuff that was meant for my 1-0 sandbox

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Sun Aug 14 20:08:12 2011
@@ -20,18 +20,6 @@
  */
 package org.apache.qpid.server;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.logging.*;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
@@ -40,28 +28,9 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.log4j.Logger;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
-import org.apache.qpid.server.information.management.ServerInformationMBean;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.management.LoggingManagementMBean;
-import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.server.Broker.InitException;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.mina.MINANetworkDriver;
+
 
 /**
  * Main entry point for AMQPD.
@@ -69,39 +38,41 @@ import org.apache.qpid.transport.network
  */
 public class Main
 {
-    private static Logger _logger;
-
-    private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
-
-    public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
-    public static final String QPID_HOME = "QPID_HOME";
-    private static final int IPV4_ADDRESS_LENGTH = 4;
+    private final Options options = new Options();
+    private CommandLine commandLine;
 
-    private static final char IPV4_LITERAL_SEPARATOR = '.';
-    private java.util.logging.Logger FRAME_LOGGER;
-    private java.util.logging.Logger RAW_LOGGER;
-
-    protected static class InitException extends Exception
+    public static void main(String[] args)
     {
-        InitException(String msg, Throwable cause)
+        //if the -Dlog4j.configuration property has not been set, enable the init override
+        //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
+        //finds from the classpath when we get the first Loggers
+        if(System.getProperty("log4j.configuration") == null)
         {
-            super(msg, cause);
+            System.setProperty("log4j.defaultInitOverride", "true");
         }
-    }
 
-    protected final Options options = new Options();
-    protected CommandLine commandLine;
+        new Main(args);
+    }
 
-    protected Main(String[] args)
+    public Main(final String[] args)
     {
         setOptions(options);
         if (parseCommandline(args))
         {
-            execute();
+            try
+            {
+                execute();
+            }
+            catch(Exception e)
+            {
+                System.err.println("Exception during startup: " + e);
+                e.printStackTrace();
+                shutdown(1);
+            }
         }
     }
 
-    protected boolean parseCommandline(String[] args)
+    protected boolean parseCommandline(final String[] args)
     {
         try
         {
@@ -119,8 +90,7 @@ public class Main
         }
     }
 
-    @SuppressWarnings("static-access")
-    protected void setOptions(Options options)
+    protected void setOptions(final Options options)
     {
         Option help = new Option("h", "help", false, "print this message");
         Option version = new Option("v", "version", false, "print the version information and exit");
@@ -164,16 +134,21 @@ public class Main
         Option bind =
                 OptionBuilder.withArgName("bind").hasArg()
                         .withDescription("bind to the specified address. Overrides any value in the config file")
-                        .withLongOpt("bind").create("b");
+                        .withLongOpt("bind").create(BrokerOptions.BIND);
         Option logconfig =
                 OptionBuilder.withArgName("logconfig").hasArg()
                         .withDescription("use the specified log4j xml configuration file. By "
-                                         + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
-                                         + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+                                         + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE
+                                         + " in the same directory as the configuration file").withLongOpt("logconfig").create(BrokerOptions.LOG_CONFIG);
         Option logwatchconfig =
                 OptionBuilder.withArgName("logwatch").hasArg()
                         .withDescription("monitor the log file configuration file for changes. Units are seconds. "
-                                         + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+                                         + "Zero means do not check for changes.").withLongOpt("logwatch").create(BrokerOptions.WATCH);
+
+        Option sslport =
+                OptionBuilder.withArgName("sslport").hasArg()
+                        .withDescription("SSL port. Overrides any value in the config file")
+                        .withLongOpt("sslport").create(BrokerOptions.SSL_PORTS);
 
         options.addOption(help);
         options.addOption(version);
@@ -187,472 +162,120 @@ public class Main
         options.addOption(exclude0_8);
         options.addOption(mport);
         options.addOption(bind);
+        options.addOption(sslport);
     }
 
-    protected void execute()
+    protected void execute() throws Exception
     {
-        // note this understands either --help or -h. If an option only has a long name you can use that but if
-        // an option has a short name and a long name you must use the short name here.
-        if (commandLine.hasOption("h"))
+        BrokerOptions options = new BrokerOptions();
+        String configFile = commandLine.getOptionValue(BrokerOptions.CONFIG);
+        if(configFile != null)
         {
-            HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp("Qpid", options, true);
+            options.setConfigFile(configFile);
         }
-        else if (commandLine.hasOption("v"))
-        {
-            String ver = QpidProperties.getVersionString();
-
-            StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
-
-            boolean first = true;
-            for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
-            {
-                if (first)
-                {
-                    first = false;
-                }
-                else
-                {
-                    protocol.append(", ");
-                }
-
-                protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
-
-            }
 
-            System.out.println(ver + " (" + protocol + ")");
-        }
-        else
+        String logWatchConfig = commandLine.getOptionValue(BrokerOptions.WATCH);
+        if(logWatchConfig != null)
         {
-            try
-            {
-                CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
-                startup();
-                CurrentActor.remove();
-            }
-            catch (InitException e)
-            {
-                System.out.println("Initialisation Error : " + e.getMessage());
-                shutdown(1);
-            }
-            catch (Throwable e)
-            {
-                System.out.println("Error initialising message broker: " + e);
-                e.printStackTrace();
-                shutdown(1);
-            }
+            options.setLogWatchFrequency(Integer.parseInt(logWatchConfig));
         }
-    }
-
-    protected void shutdown(int status)
-    {
-        ApplicationRegistry.removeAll();
-        System.exit(status);
-    }
-
-    protected void startup() throws Exception
-    {
-
-        FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log");
-        RAW_LOGGER = updateLogger("RAW", "qpid-raw.log");
-
-        final String QpidHome = System.getProperty(QPID_HOME);
-        final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
-        final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
-        if (!configFile.exists())
-        {
-            String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
 
-            if (QpidHome == null)
-            {
-                error = error + "\nNote: " + QPID_HOME + " is not set.";
-            }
-
-            throw new InitException(error, null);
-        }
-        else
+        String logConfig = commandLine.getOptionValue(BrokerOptions.LOG_CONFIG);
+        if(logConfig != null)
         {
-            CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
+            options.setLogConfigFile(logConfig);
         }
 
-        String logConfig = commandLine.getOptionValue("l");
-        String logWatchConfig = commandLine.getOptionValue("w", "0");
-
-        int logWatchTime = 0;
-        try
-        {
-            logWatchTime = Integer.parseInt(logWatchConfig);
-        }
-        catch (NumberFormatException e)
+        String jmxPort = commandLine.getOptionValue(BrokerOptions.MANAGEMENT);
+        if(jmxPort != null)
         {
-            System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
-                               + "a non-negative integer. Using default of zero (no watching configured");
+            options.setJmxPort(Integer.parseInt(jmxPort));
         }
 
-        File logConfigFile;
-        if (logConfig != null)
-        {
-            logConfigFile = new File(logConfig);
-            configureLogging(logConfigFile, logWatchTime);
-        }
-        else
+        String bindAddr = commandLine.getOptionValue(BrokerOptions.BIND);
+        if (bindAddr != null)
         {
-            File configFileDirectory = configFile.getParentFile();
-            logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
-            configureLogging(logConfigFile, logWatchTime);
+            options.setBind(bindAddr);
         }
 
-        ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
-        ServerConfiguration serverConfig = config.getConfiguration();
-        updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
-
-        ApplicationRegistry.initialise(config);
-
-        // We have already loaded the BrokerMessages class by this point so we
-        // need to refresh the locale setting incase we had a different value in
-        // the configuration.
-        BrokerMessages.reload();
-
-        // AR.initialise() sets and removes its own actor so we now need to set the actor
-        // for the remainder of the startup, and the default actor if the stack is empty
-        CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
-        CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
-        GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
-        
-
-        try
+        String[] portStr = commandLine.getOptionValues(BrokerOptions.PORTS);
+        if(portStr != null)
         {
-            configureLoggingManagementMBean(logConfigFile, logWatchTime);
-
-            ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
-            configMBean.register();
-
-            ServerInformationMBean sysInfoMBean =
-                    new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
-            sysInfoMBean.register();
-
-
-            String[] portStr = commandLine.getOptionValues("p");
-
-            Set<Integer> ports = new HashSet<Integer>();
-            Set<Integer> exclude_0_10 = new HashSet<Integer>();
-            Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
-            Set<Integer> exclude_0_9 = new HashSet<Integer>();
-            Set<Integer> exclude_0_8 = new HashSet<Integer>();
-
-            if(portStr == null || portStr.length == 0)
-            {
-
-                parsePortList(ports, serverConfig.getPorts());
-                parsePortList(exclude_0_10, serverConfig.getPortExclude010());
-                parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
-                parsePortList(exclude_0_9, serverConfig.getPortExclude09());
-                parsePortList(exclude_0_8, serverConfig.getPortExclude08());
-
-            }
-            else
-            {
-                parsePortArray(ports, portStr);
-                parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
-                parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1"));
-                parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
-                parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
-
-            }
-
-
-
-
-            String bindAddr = commandLine.getOptionValue("b");
-            if (bindAddr == null)
-            {
-                bindAddr = serverConfig.getBind();
-            }
-            InetAddress bindAddress = null;
-
-
-
-            if (bindAddr.equals("wildcard"))
+            parsePortArray(options, portStr, false);
+            for(ProtocolExclusion pe : ProtocolExclusion.values())
             {
-                bindAddress = new InetSocketAddress(0).getAddress();
+                parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
             }
-            else
-            {
-                bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
-            }
-
-            String hostName = bindAddress.getCanonicalHostName();
-
-
-            String keystorePath = serverConfig.getKeystorePath();
-            String keystorePassword = serverConfig.getKeystorePassword();
-            String certType = serverConfig.getCertType();
-            SSLContextFactory sslFactory = null;
-
-            if (!serverConfig.getSSLOnly())
-            {
-
-                for(int port : ports)
-                {
-
-                    NetworkDriver driver = new MINANetworkDriver();
-
-                    Set<VERSION> supported = EnumSet.allOf(VERSION.class);
-
-                    if(exclude_0_10.contains(port))
-                    {
-                        supported.remove(VERSION.v0_10);
-                    }
-
-                    if(exclude_0_9_1.contains(port))
-                    {
-                        supported.remove(VERSION.v0_9_1);
-                    }
-                    if(exclude_0_9.contains(port))
-                    {
-                        supported.remove(VERSION.v0_9);
-                    }
-                    if(exclude_0_8.contains(port))
-                    {
-                        supported.remove(VERSION.v0_8);
-                    }
-
-                    MultiVersionProtocolEngineFactory protocolEngineFactory =
-                            new MultiVersionProtocolEngineFactory(hostName, supported);
-
-
-
-                    driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
-                                serverConfig.getNetworkConfiguration(), null);
-                    ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
-                                                                  new QpidAcceptor(driver,"TCP"));
-                    CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
-
-                }
-
-            }
-
-            if (serverConfig.getEnableSSL())
-            {
-                sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
-                NetworkDriver driver = new MINANetworkDriver();
-                driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
-                            new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
-                ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()),
-                        new QpidAcceptor(driver,"TCP"));
-                CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort()));
-            }
-
-            CurrentActor.get().message(BrokerMessages.READY());
-
         }
-        finally
-        {
-            // Startup is complete so remove the AR initialised Startup actor
-            CurrentActor.remove();
-        }
-
-
 
-    }
-
-    private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException
-    {
-        java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType);
-        logger.setLevel(Level.FINE);
-        Formatter formatter = new Formatter()
+        String[] sslPortStr = commandLine.getOptionValues(BrokerOptions.SSL_PORTS);
+        if(sslPortStr != null)
         {
-            @Override
-            public String format(final LogRecord record)
+            parsePortArray(options, sslPortStr, true);
+            for(ProtocolExclusion pe : ProtocolExclusion.values())
             {
-
-                return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n";
+                parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
             }
-        };
-        for(Handler handler : logger.getHandlers())
-        {
-            logger.removeHandler(handler);
         }
-        Handler handler = new ConsoleHandler();
-
-        handler.setLevel(Level.FINE);
-        handler.setFormatter(formatter);
-
-        logger.addHandler(handler);
-
-
-        handler = new FileHandler(logFileName, true);
-        handler.setLevel(Level.FINE);
-        handler.setFormatter(formatter);
+        
+        startBroker(options);
+    }
 
-        logger.addHandler(handler);
-        return logger;
+    protected void startBroker(final BrokerOptions options) throws Exception
+    {
+        Broker broker = new Broker();
+        broker.startup(options);
     }
 
-    private void parsePortArray(Set<Integer> ports, String[] portStr)
-            throws InitException
+    protected void shutdown(final int status)
     {
-        if(portStr != null)
-        {
-            for(int i = 0; i < portStr.length; i++)
-            {
-                try
-                {
-                    ports.add(Integer.parseInt(portStr[i]));
-                }
-                catch (NumberFormatException e)
-                {
-                    throw new InitException("Invalid port: " + portStr[i], e);
-                }
-            }
-        }
+        ApplicationRegistry.remove();
+        System.exit(status);
     }
 
-    private void parsePortList(Set<Integer> output, List input)
-            throws InitException
+    private static void parsePortArray(final BrokerOptions options,final Object[] ports,
+                                       final boolean ssl) throws InitException
     {
-        if(input != null)
+        if(ports != null)
         {
-            for(Object port : input)
+            for(int i = 0; i < ports.length; i++)
             {
                 try
                 {
-                    output.add(Integer.parseInt(String.valueOf(port)));
+                    if(ssl)
+                    {
+                        options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
+                    }
+                    else
+                    {
+                        options.addPort(Integer.parseInt(String.valueOf(ports[i])));
+                    }
                 }
                 catch (NumberFormatException e)
                 {
-                    throw new InitException("Invalid port: " + port, e);
+                    throw new InitException("Invalid port: " + ports[i], e);
                 }
             }
         }
     }
 
-    /**
-     * Update the configuration data with the management port.
-     * @param configuration
-     * @param managementPort The string from the command line
-     */
-    private void updateManagementPort(ServerConfiguration configuration, String managementPort)
+    private static void parsePortArray(final BrokerOptions options, final Object[] ports,
+                                       final ProtocolExclusion excludedProtocol) throws InitException
     {
-        if (managementPort != null)
+        if(ports != null)
         {
-            try
-            {
-                configuration.setJMXManagementPort(Integer.parseInt(managementPort));
-            }
-            catch (NumberFormatException e)
-            {
-                _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
-            }
-        }
-    }
-
-    public static void main(String[] args)
-    {
-        //if the -Dlog4j.configuration property has not been set, enable the init override
-        //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
-        //finds from the classpath when we get the first Loggers
-        if(System.getProperty("log4j.configuration") == null)
-        {
-            System.setProperty("log4j.defaultInitOverride", "true");
-        }
-
-        //now that the override status is know, we can instantiate the Loggers
-        _logger = Logger.getLogger(Main.class);
-
-        new Main(args);
-    }
-
-    private byte[] parseIP(String address) throws Exception
-    {
-        char[] literalBuffer = address.toCharArray();
-        int byteCount = 0;
-        int currByte = 0;
-        byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
-        for (int i = 0; i < literalBuffer.length; i++)
-        {
-            char currChar = literalBuffer[i];
-            if ((currChar >= '0') && (currChar <= '9'))
-            {
-                currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF);
-            }
-
-            if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length))
-            {
-                ip[byteCount++] = (byte) currByte;
-                currByte = 0;
-            }
-        }
-
-        if (byteCount != 4)
-        {
-            throw new Exception("Invalid IP address: " + address);
-        }
-        return ip;
-    }
-
-    private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
-    {
-        if (logConfigFile.exists() && logConfigFile.canRead())
-        {
-            CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
-
-            if (logWatchTime > 0)
-            {
-                System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
-                                   + logWatchTime + " seconds");
-                // log4j expects the watch interval in milliseconds
-                try
-                {
-                    QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
-                }
-                catch (Exception e)
-                {
-                    throw new InitException(e.getMessage(),e);
-                }
-            }
-            else
+            for(int i = 0; i < ports.length; i++)
             {
                 try
                 {
-                    QpidLog4JConfigurator.configure(logConfigFile.getPath());
+                    options.addExcludedPort(excludedProtocol, 
+                            Integer.parseInt(String.valueOf(ports[i])));
                 }
-                catch (Exception e)
-                {
-                    throw new InitException(e.getMessage(),e);
-                }
-            }
-        }
-        else
-        {
-            System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
-            System.err.println("Using the fallback internal log4j.properties configuration");
-
-            InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
-            if(propsFile == null)
-            {
-                throw new IOException("Unable to load the fallback internal log4j.properties configuration file");
-            }
-            else
-            {
-                try
-                {
-                    Properties fallbackProps = new Properties();
-                    fallbackProps.load(propsFile);
-                    PropertyConfigurator.configure(fallbackProps);
-                }
-                finally
+                catch (NumberFormatException e)
                 {
-                    propsFile.close();
+                    throw new InitException("Invalid port for exclusion: " + ports[i], e);
                 }
             }
         }
     }
-
-    private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
-    {
-        LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
-
-        blm.register();
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Sun Aug 14 20:08:12 2011
@@ -20,56 +20,44 @@
 */
 package org.apache.qpid.server.protocol;
 
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
-
-import java.util.Set;
-import java.util.Arrays;
-import java.util.HashSet;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
 {
-    ;
-
-
-    public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 };
-
-    private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
+    private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
+    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
     private final IApplicationRegistry _appRegistry;
     private final String _fqdn;
-    private final Set<VERSION> _supported;
-
+    private final Set<AmqpProtocolVersion> _supported;
 
     public MultiVersionProtocolEngineFactory()
     {
-        this(1, "localhost", ALL_VERSIONS);
-    }
-
-    public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
-    {
-        this(1, fqdn, versions);
+        this("localhost", ALL_VERSIONS);
     }
 
-
     public MultiVersionProtocolEngineFactory(String fqdn)
     {
-        this(1, fqdn, ALL_VERSIONS);
+        this(fqdn, ALL_VERSIONS);
     }
 
-    public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
+    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
     {
-        _appRegistry = ApplicationRegistry.getInstance(instance);
+        _appRegistry = ApplicationRegistry.getInstance();
         _fqdn = fqdn;
         _supported = supportedVersions;
     }
 
-
-    public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+    public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
     {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
+        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Aug 14 20:08:12 2011
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements A
     /** null means shared */
     private final AMQShortString _owner;
 
-    private PrincipalHolder _prinicpalHolder;
+    private AuthorizationHolder _authorizationHolder;
 
     private boolean _exclusive = false;
     private AMQSessionModel _exclusiveOwner;
@@ -102,9 +102,7 @@ public class SimpleAMQQueue implements A
 
     protected final QueueEntryList _entries;
 
-    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
-
-    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+    protected final SubscriptionList _subscriptionList = new SubscriptionList();
 
     private volatile Subscription _exclusiveSubscriber;
 
@@ -188,7 +186,7 @@ public class SimpleAMQQueue implements A
     //TODO : persist creation time
     private long _createTime = System.currentTimeMillis();
     private ConfigurationPlugin _queueConfiguration;
-    private final boolean _isTopic;
+
 
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
@@ -234,12 +232,10 @@ public class SimpleAMQQueue implements A
         _exclusive = exclusive;
         _virtualHost = virtualHost;
         _entries = entryListFactory.createQueueEntryList(this);
-        _arguments = arguments == null ? Collections.EMPTY_MAP : arguments;
+        _arguments = arguments;
 
         _id = virtualHost.getConfigStore().createId();
 
-        _isTopic = arguments != null && arguments.containsKey("topic");
-
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
         _logSubject = new QueueLogSubject(this);
@@ -331,7 +327,7 @@ public class SimpleAMQQueue implements A
     {
         return _exclusive;
     }
-
+    
     public void setExclusive(boolean exclusive) throws AMQException
     {
         _exclusive = exclusive;
@@ -375,14 +371,14 @@ public class SimpleAMQQueue implements A
         return _owner;
     }
 
-    public PrincipalHolder getPrincipalHolder()
+    public AuthorizationHolder getAuthorizationHolder()
     {
-        return _prinicpalHolder;
+        return _authorizationHolder;
     }
 
-    public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
+    public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
     {
-        _prinicpalHolder = prinicpalHolder;
+        _authorizationHolder = authorizationHolder;
     }
 
 
@@ -406,8 +402,8 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied");
         }
-
-
+        
+        
         if (hasExclusiveSubscriber())
         {
             throw new ExistingExclusiveSubscription();
@@ -437,14 +433,14 @@ public class SimpleAMQQueue implements A
                 subscription.setNoLocal(_nolocal);
             }
             _subscriptionList.add(subscription);
-
+            
             //Increment consumerCountHigh if necessary. (un)registerSubscription are both
             //synchronized methods so we don't need additional synchronization here
             if(_counsumerCountHigh.get() < getConsumerCount())
             {
                 _counsumerCountHigh.incrementAndGet();
             }
-
+            
             if (isDeleted())
             {
                 subscription.queueDeleted(this);
@@ -490,11 +486,6 @@ public class SimpleAMQQueue implements A
                 // queue. This is because the delete method uses the subscription set which has just been cleared
                 subscription.queueDeleted(this);
             }
-
-            if(_subscriptionList.size() == 0 && _isTopic)
-            {
-                clearQueue();
-            }
         }
 
     }
@@ -521,10 +512,10 @@ public class SimpleAMQQueue implements A
                 break;
             }
         }
-
+        
         reconfigure();
     }
-
+    
     private void reconfigure()
     {
         //Reconfigure the queue for to reflect this new binding.
@@ -550,7 +541,7 @@ public class SimpleAMQQueue implements A
     public void removeBinding(final Binding binding)
     {
         _bindings.remove(binding);
-
+        
         reconfigure();
     }
 
@@ -577,104 +568,101 @@ public class SimpleAMQQueue implements A
 
     public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
     {
+        incrementTxnEnqueueStats(message);
+        incrementQueueCount();
+        incrementQueueSize(message);
         _totalMessagesReceived.incrementAndGet();
 
 
+        QueueEntry entry;
         Subscription exclusiveSub = _exclusiveSubscriber;
-        if(!_isTopic || _subscriptionList.size()!=0)
-        {
-            incrementTxnEnqueueStats(message);
-            incrementQueueCount();
-            incrementQueueSize(message);
 
-            QueueEntry entry;
+        if (exclusiveSub != null)
+        {
+            exclusiveSub.getSendLock();
 
-            if (exclusiveSub != null)
+            try
             {
-                exclusiveSub.getSendLock();
-
-                try
-                {
-                    entry = _entries.add(message);
+                entry = _entries.add(message);
 
-                    deliverToSubscription(exclusiveSub, entry);
-                }
-                finally
-                {
-                    exclusiveSub.releaseSendLock();
-                }
+                deliverToSubscription(exclusiveSub, entry);
             }
-            else
+            finally
             {
-                entry = _entries.add(message);
-                /*
+                exclusiveSub.releaseSendLock();
+            }
+        }
+        else
+        {
+            entry = _entries.add(message);
+            /*
 
-                iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+            iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
 
-                 */
-                SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
-                SubscriptionList.SubscriptionNode nextNode = node.getNext();
-                if (nextNode == null)
+             */
+            SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
+            SubscriptionList.SubscriptionNode nextNode = node.findNext();
+            if (nextNode == null)
+            {
+                nextNode = _subscriptionList.getHead().findNext();
+            }
+            while (nextNode != null)
+            {
+                if (_subscriptionList.updateMarkedNode(node, nextNode))
                 {
-                    nextNode = _subscriptionList.getHead().getNext();
+                    break;
                 }
-                while (nextNode != null)
+                else
                 {
-                    if (_lastSubscriptionNode.compareAndSet(node, nextNode))
-                    {
-                        break;
-                    }
-                    else
+                    node = _subscriptionList.getMarkedNode();
+                    nextNode = node.findNext();
+                    if (nextNode == null)
                     {
-                        node = _lastSubscriptionNode.get();
-                        nextNode = node.getNext();
-                        if (nextNode == null)
-                        {
-                            nextNode = _subscriptionList.getHead().getNext();
-                        }
+                        nextNode = _subscriptionList.getHead().findNext();
                     }
                 }
+            }
 
-                // always do one extra loop after we believe we've finished
-                // this catches the case where we *just* miss an update
-                int loops = 2;
+            // always do one extra loop after we believe we've finished
+            // this catches the case where we *just* miss an update
+            int loops = 2;
 
-                while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+            while (entry.isAvailable() && loops != 0)
+            {
+                if (nextNode == null)
                 {
-                    if (nextNode == null)
-                    {
-                        loops--;
-                        nextNode = _subscriptionList.getHead();
-                    }
-                    else
-                    {
-                        // if subscription at end, and active, offer
-                        Subscription sub = nextNode.getSubscription();
-                        deliverToSubscription(sub, entry);
-                    }
-                    nextNode = nextNode.getNext();
-
+                    loops--;
+                    nextNode = _subscriptionList.getHead();
+                }
+                else
+                {
+                    // if subscription at end, and active, offer
+                    Subscription sub = nextNode.getSubscription();
+                    deliverToSubscription(sub, entry);
                 }
+                nextNode = nextNode.findNext();
+
             }
+        }
 
 
-            if (!(entry.isAcquired() || entry.isDeleted()))
-            {
-                checkSubscriptionsNotAheadOfDelivery(entry);
+        if (entry.isAvailable())
+        {
+            checkSubscriptionsNotAheadOfDelivery(entry);
 
-                deliverAsync();
-            }
+            deliverAsync();
+        }
 
-            if(_managedObject != null)
-            {
-                _managedObject.checkForNotification(entry.getMessage());
-            }
+        if(_managedObject != null)
+        {
+            _managedObject.checkForNotification(entry.getMessage());
+        }
 
-            if(action != null)
-            {
-                action.onEnqueue(entry);
-            }
+        if(action != null)
+        {
+            action.onEnqueue(entry);
         }
+
     }
 
     private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -730,20 +718,20 @@ public class SimpleAMQQueue implements A
     {
         getAtomicQueueCount().incrementAndGet();
     }
-
+    
     private void incrementTxnEnqueueStats(final ServerMessage message)
     {
         SessionConfig session = message.getSessionConfig();
-
+        
         if(session !=null && session.isTransactional())
         {
             _msgTxnEnqueues.incrementAndGet();
             _byteTxnEnqueues.addAndGet(message.getSize());
         }
     }
-
+    
     private void incrementTxnDequeueStats(QueueEntry entry)
-    {
+    {      
         _msgTxnDequeues.incrementAndGet();
         _byteTxnDequeues.addAndGet(entry.getSize());
     }
@@ -757,40 +745,6 @@ public class SimpleAMQQueue implements A
         incrementUnackedMsgCount();
 
         sub.send(entry);
-
-        if(_isTopic)
-        {
-            if(allSubscriptionsAhead(entry) && entry.acquire())
-            {
-                entry.discard();
-            }
-        }
-    }
-
-    private boolean allSubscriptionsAhead(final QueueEntry entry)
-    {
-        SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
-        while(subIter.advance() && !entry.isAcquired())
-        {
-            final Subscription subscription = subIter.getNode().getSubscription();
-            if(!subscription.isClosed())
-            {
-                QueueContext context = (QueueContext) subscription.getQueueContext();
-                if(context != null)
-                {
-                    QueueEntry subnode = context._lastSeenEntry;
-                    if(subnode.compareTo(entry)<0)
-                    {
-                        return false;
-                    }
-                }
-                else
-                {
-                    return false;
-                }
-            }
-        }
-        return true;
     }
 
     private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
@@ -849,24 +803,6 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void requeue(QueueEntryImpl entry, Subscription subscription)
-    {
-        SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
-        // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
-        while (subscriberIter.advance())
-        {
-            Subscription sub = subscriberIter.getNode().getSubscription();
-
-            // we don't make browsers send the same stuff twice
-            if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
-            {
-                updateSubRequeueEntry(sub, entry);
-            }
-        }
-
-        deliverAsync();
-    }
-
     public void dequeue(QueueEntry entry, Subscription sub)
     {
         decrementQueueCount();
@@ -875,7 +811,7 @@ public class SimpleAMQQueue implements A
         {
             _deliveredMessages.decrementAndGet();
         }
-
+        
         if(sub != null && sub.isSessionTransactional())
         {
             incrementTxnDequeueStats(entry);
@@ -932,7 +868,7 @@ public class SimpleAMQQueue implements A
     {
         return _subscriptionList.size();
     }
-
+    
     public int getConsumerCountHigh()
     {
         return _counsumerCountHigh.get();
@@ -1004,7 +940,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (node != null && !node.isDeleted())
+            if (node != null && !node.isDispensed())
             {
                 entryList.add(node);
             }
@@ -1108,7 +1044,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance() && !filter.filterComplete())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && filter.accept(node))
+            if (!node.isDispensed() && filter.accept(node))
             {
                 entryList.add(node);
             }
@@ -1302,7 +1238,6 @@ public class SimpleAMQQueue implements A
 
                 if ((messageId >= fromMessageId)
                     && (messageId <= toMessageId)
-                    && !node.isDeleted()
                     && node.acquire())
                 {
                     dequeueEntry(node);
@@ -1332,7 +1267,7 @@ public class SimpleAMQQueue implements A
         while (noDeletes && queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && node.acquire())
+            if (node.acquire())
             {
                 dequeueEntry(node);
                 noDeletes = false;
@@ -1342,7 +1277,7 @@ public class SimpleAMQQueue implements A
     }
 
     public long clearQueue() throws AMQException
-    {
+    {         
         return clear(0l);
     }
 
@@ -1353,7 +1288,7 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied: queue " + getName());
         }
-
+        
         QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
@@ -1362,7 +1297,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && node.acquire())
+            if (node.acquire())
             {
                 dequeueEntry(node, txn);
                 if(++count == request)
@@ -1420,7 +1355,7 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied: " + getName());
         }
-
+        
         if (!_deleted.getAndSet(true))
         {
 
@@ -1629,7 +1564,7 @@ public class SimpleAMQQueue implements A
 
     public void deliverAsync()
     {
-        Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+        QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
 
         if (_asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1648,52 +1583,6 @@ public class SimpleAMQQueue implements A
         _asyncDelivery.execute(flusher);
     }
 
-
-    private class Runner implements ReadWriteRunnable
-    {
-        String _name;
-        public Runner(long count)
-        {
-            _name = "QueueRunner-" + count + "-" + _logActor;
-        }
-
-        public void run()
-        {
-            String originalName = Thread.currentThread().getName();
-            try
-            {
-                Thread.currentThread().setName(_name);
-                CurrentActor.set(_logActor);
-
-                processQueue(this);
-            }
-            catch (AMQException e)
-            {
-                _logger.error(e);
-            }
-            finally
-            {
-                CurrentActor.remove();
-                Thread.currentThread().setName(originalName);
-            }
-        }
-
-        public boolean isRead()
-        {
-            return false;
-        }
-
-        public boolean isWrite()
-        {
-            return true;
-        }
-
-        public String toString()
-        {
-            return _name;
-        }
-    }
-
     public void flushSubscription(Subscription sub) throws AMQException
     {
         // Access control
@@ -1714,9 +1603,12 @@ public class SimpleAMQQueue implements A
             {
                 sub.getSendLock();
                 atTail = attemptDelivery(sub);
-                if (atTail && getNextAvailableEntry(sub) == null)
+                if (atTail && sub.isAutoClose())
                 {
-                    sub.queueEmpty();
+                    unregisterSubscription(sub);
+
+                    sub.confirmAutoClose();
+
                 }
                 else if (!atTail)
                 {
@@ -1737,7 +1629,6 @@ public class SimpleAMQQueue implements A
         {
             advanceAllSubscriptions();
         }
-
         return atTail;
     }
 
@@ -1760,7 +1651,7 @@ public class SimpleAMQQueue implements A
 
             QueueEntry node  = getNextAvailableEntry(sub);
 
-            if (node != null && !(node.isAcquired() || node.isDeleted()))
+            if (node != null && node.isAvailable())
             {
                 if (sub.hasInterest(node))
                 {
@@ -1821,7 +1712,7 @@ public class SimpleAMQQueue implements A
             QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
-            while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
+            while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
             {
                 if (expired)
                 {
@@ -1850,14 +1741,40 @@ public class SimpleAMQQueue implements A
     }
 
 
-    private void processQueue(Runnable runner) throws AMQException
+    /**
+     * Used by queue Runners to asynchronously deliver messages to consumers.
+     *
+     * A queue Runner is started whenever a state change occurs, e.g when a new
+     * message arrives on the queue and cannot be immediately delivered to a
+     * subscription (i.e. asynchronous delivery is required). Unless there are
+     * SubFlushRunners operating (due to subscriptions unsuspending) which are
+     * capable of accepting/delivering all messages then these messages would
+     * otherwise remain on the queue.
+     *
+     * processQueue should be running while there are messages on the queue AND
+     * there are subscriptions that can deliver them. If there are no
+     * subscriptions capable of delivering the remaining messages on the queue
+     * then processQueue should stop to prevent spinning.
+     *
+     * Since processQueue is runs in a fixed size Executor, it should not run
+     * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+     * incoming messages may not be able to be scheduled in the thread pool
+     * because all threads are working on clearing down large queues). To solve
+     * this problem, after an arbitrary number of message deliveries the
+     * processQueue job stops iterating, resubmits itself to the executor, and
+     * ends the current instance
+     *
+     * @param runner the Runner to schedule
+     * @throws AMQException
+     */
+    public void processQueue(QueueRunner runner) throws AMQException
     {
         long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
 
-        int extraLoops = 1;
-        long iterations = MAX_ASYNC_DELIVERIES;
+        boolean lastLoop = false;
+        int iterations = MAX_ASYNC_DELIVERIES;
 
         _asynchronousRunner.compareAndSet(runner, null);
 
@@ -1874,12 +1791,14 @@ public class SimpleAMQQueue implements A
 
             if (previousStateChangeCount != stateChangeCount)
             {
-                extraLoops = 1;
+                //further asynchronous delivery is required since the
+                //previous loop. keep going if iteration slicing allows.
+                lastLoop = false;
             }
 
             previousStateChangeCount = stateChangeCount;
-            deliveryIncomplete = _subscriptionList.size() != 0;
-            boolean done;
+            boolean allSubscriptionsDone = true;
+            boolean subscriptionDone;
 
             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
             //iterate over the subscribers and try to advance their pointer
@@ -1889,29 +1808,25 @@ public class SimpleAMQQueue implements A
                 sub.getSendLock();
                 try
                 {
-
-                    done = attemptDelivery(sub);
-
-                    if (done)
+                    //attempt delivery. returns true if no further delivery currently possible to this sub
+                    subscriptionDone = attemptDelivery(sub);
+                    if (subscriptionDone)
                     {
-                        if (extraLoops == 0)
+                        //close autoClose subscriptions if we are not currently intent on continuing
+                        if (lastLoop && sub.isAutoClose())
                         {
-                            if(getNextAvailableEntry(sub) == null)
-                            {
-                                sub.queueEmpty();
-                            }
-                            deliveryIncomplete = false;
+                            unregisterSubscription(sub);
 
-                        }
-                        else
-                        {
-                            extraLoops--;
+                            sub.confirmAutoClose();
                         }
                     }
                     else
                     {
+                        //this subscription can accept additional deliveries, so we must 
+                        //keep going after this (if iteration slicing allows it)
+                        allSubscriptionsDone = false;
+                        lastLoop = false;
                         iterations--;
-                        extraLoops = 1;
                     }
                 }
                 finally
@@ -1919,10 +1834,34 @@ public class SimpleAMQQueue implements A
                     sub.releaseSendLock();
                 }
             }
+
+            if(allSubscriptionsDone && lastLoop)
+            {
+                //We have done an extra loop already and there are again
+                //again no further delivery attempts possible, only
+                //keep going if state change demands it.
+                deliveryIncomplete = false;
+            }
+            else if(allSubscriptionsDone)
+            {
+                //All subscriptions reported being done, but we have to do
+                //an extra loop if the iterations are not exhausted and
+                //there is still any work to be done
+                deliveryIncomplete = _subscriptionList.size() != 0;
+                lastLoop = true;
+            }
+            else
+            {
+                //some subscriptions can still accept more messages,
+                //keep going if iteration count allows.
+                lastLoop = false;
+                deliveryIncomplete = true;
+            }
+
             _asynchronousRunner.set(null);
         }
 
-        // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+        // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
         if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1942,8 +1881,8 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            // Only process nodes that are not currently deleted
-            if (!node.isDeleted())
+            // Only process nodes that are not currently deleted and not dequeued
+            if (!node.isDispensed())
             {
                 // If the node has exired then aquire it
                 if (node.expired() && node.acquire())
@@ -2207,22 +2146,22 @@ public class SimpleAMQQueue implements A
     {
         return _dequeueSize.get();
     }
-
+    
     public long getByteTxnEnqueues()
     {
         return _byteTxnEnqueues.get();
     }
-
+    
     public long getByteTxnDequeues()
     {
         return _byteTxnDequeues.get();
     }
-
+    
     public long getMsgTxnEnqueues()
     {
         return _msgTxnEnqueues.get();
     }
-
+    
     public long getMsgTxnDequeues()
     {
         return _msgTxnDequeues.get();
@@ -2259,21 +2198,21 @@ public class SimpleAMQQueue implements A
     {
         return _unackedMsgCountHigh.get();
     }
-
+    
     public long getUnackedMessageCount()
     {
         return _unackedMsgCount.get();
     }
-
+    
     public void decrementUnackedMsgCount()
     {
         _unackedMsgCount.decrementAndGet();
     }
-
+    
     private void incrementUnackedMsgCount()
     {
         long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-
+        
         long unackedMsgCountHigh;
         while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
         {
@@ -2283,4 +2222,9 @@ public class SimpleAMQQueue implements A
             }
         }
     }
+
+    public LogActor getLogActor()
+    {
+        return _logActor;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Sun Aug 14 20:08:12 2011
@@ -20,20 +20,73 @@
  */
 package org.apache.qpid.server.security.auth.manager;
 
+import javax.security.auth.Subject;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
-import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource;
 import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 
-public interface AuthenticationManager extends Closeable, CallbackHanderSource
+/**
+ * Implementations of the AuthenticationManager are responsible for determining
+ * the authenticity of a user's credentials.
+ * 
+ * If the authentication is successful, the manager is responsible for producing a populated
+ * {@link Subject} containing the user's identity and zero or more principals representing
+ * groups to which the user belongs.
+ * <p>
+ * The {@link #initialise()} method is responsible for registering SASL mechanisms required by
+ * the manager.  The {@link #close()} method must reverse this registration.
+ * 
+ */
+public interface AuthenticationManager extends Closeable, Plugin
 {
+    /** The name for the required SASL Server mechanisms */
+    public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
+
+    /**
+     * Initialise the authentication plugin.
+     *
+     */
+    void initialise();
+
+   /**
+    * Gets the SASL mechanisms known to this manager.
+    *
+    * @return SASL mechanism names, space separated.
+    */
     String getMechanisms();
 
+    /**
+     * Creates a SASL server for the specified mechanism name for the given
+     * fully qualified domain name.
+     *
+     * @param mechanism mechanism name
+     * @param localFQDN domain name
+     *
+     * @return SASL server
+     * @throws SaslException
+     */
     SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
 
+    /**
+     * Authenticates a user using SASL negotiation.
+     *
+     * @param server SASL server
+     * @param response SASL response to process
+     *
+     * @return authentication result
+     */
     AuthenticationResult authenticate(SaslServer server, byte[] response);
 
-
+    /**
+     * Authenticates a user using their username and password.
+     *
+     * @param username username
+     * @param password password
+     *
+     * @return authentication result
+     */
+    AuthenticationResult authenticate(String username, String password);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Sun Aug 14 20:08:12 2011
@@ -20,27 +20,65 @@
  */
 package org.apache.qpid.server.security.auth.manager;
 
-import org.apache.log4j.Logger;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Security;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.login.AccountNotFoundException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.log4j.Logger;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.JCAProvider;
+import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
 import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.sasl.JCAProvider;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.SaslServerFactory;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.Sasl;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TreeMap;
-import java.security.Security;
 
+/**
+ * Concrete implementation of the AuthenticationManager that determines if supplied
+ * user credentials match those appearing in a PrincipalDatabase.   The implementation
+ * of the PrincipalDatabase is determined from the configuration.
+ * 
+ * This implementation also registers the JMX UserManagemement MBean.
+ * 
+ * This plugin expects configuration such as:
+ *
+ * <pre>
+ * &lt;pd-auth-manager&gt;
+ *   &lt;principal-database&gt;
+ *      &lt;class&gt;org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase&lt;/class&gt;
+ *      &lt;attributes&gt;
+ *         &lt;attribute&gt;
+ *              &lt;name>passwordFile&lt;/name&gt;
+ *              &lt;value>${conf}/passwd&lt;/value&gt;
+ *          &lt;/attribute&gt;
+ *      &lt;/attributes&gt;
+ *   &lt;/principal-database&gt;
+ * &lt;/pd-auth-manager&gt;
+ * </pre>
+ */
 public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager
 {
     private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class);
@@ -49,55 +87,109 @@ public class PrincipalDatabaseAuthentica
     private String _mechanisms;
 
     /** Maps from the mechanism to the callback handler to use for handling those requests */
-    private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+    private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
 
     /**
      * Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for
      * details of the use of these properties. This map is populated during initialisation of each provider.
      */
-    private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
+    private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
 
-    private AuthenticationManager _default = null;
-    /** The name for the required SASL Server mechanisms */
-    public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
+    protected PrincipalDatabase _principalDatabase = null;
 
-    public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception
-    {
-        _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'")
-                     + " PrincipalDatabase authentication manager.");
+    protected AMQUserManagementMBean _mbean = null;
 
-        // Fixme This should be done per Vhost but allowing global hack isn't right but ...
-        // required as authentication is done before Vhost selection
+    public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
+    {
+        public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
+        {
+            final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
 
-        Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
+            // If there is no configuration for this plugin then don't load it.
+            if (configuration == null)
+            {
+                _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager");
+                return null;
+            }
 
+            final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager();
+            pdam.configure(configuration);
+            pdam.initialise();
+            return pdam;
+        }
 
-        if (name == null || hostConfig == null)
+        public Class<PrincipalDatabaseAuthenticationManager> getPluginClass()
         {
-            initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
+            return PrincipalDatabaseAuthenticationManager.class;
         }
-        else
+
+        public String getPluginName()
         {
-            String databaseName = hostConfig.getAuthenticationDatabase();
+            return PrincipalDatabaseAuthenticationManager.class.getName();
+        }
+    };
 
-            if (databaseName == null)
+    public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
+ 
+        public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
+        {
+            public List<String> getParentPaths()
             {
-
-                _default = ApplicationRegistry.getInstance().getAuthenticationManager();
-                return;
+                return Arrays.asList("security.pd-auth-manager");
             }
-            else
+
+            public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
             {
-                PrincipalDatabase database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(databaseName);
+                final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
+                
+                instance.setConfiguration(path, config);
+                return instance;
+            }
+        };
+
+        public String[] getElementsProcessed()
+        {
+            return new String[] {"principal-database.class",
+                                 "principal-database.attributes.attribute.name",
+                                 "principal-database.attributes.attribute.value"};
+        }
 
-                if (database == null)
-                {
-                    throw new ConfigurationException("Requested database:" + databaseName + " was not found");
-                }
+        public void validateConfiguration() throws ConfigurationException
+        {
+        }
+  
+        public String getPrincipalDatabaseClass()
+        {
+            return _configuration.getString("principal-database.class");
+        }
+  
+        public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
+        {
+            final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name");
+            final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value");
+            final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
 
-                initialiseAuthenticationMechanisms(providerMap, database);
+            for (int i = 0; i < argumentNames.size(); i++)
+            {
+                final String argName = argumentNames.get(i);
+                final String argValue = argumentValues.get(i);
+
+                attributes.put(argName, argValue);
             }
+
+            return Collections.unmodifiableMap(attributes);
         }
+    }
+
+    protected PrincipalDatabaseAuthenticationManager()  
+    {
+    }
+
+    public void initialise()
+    {
+        final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
+
+        initialiseAuthenticationMechanisms(providerMap, _principalDatabase);
 
         if (providerMap.size() > 0)
         {
@@ -110,33 +202,16 @@ public class PrincipalDatabaseAuthentica
             {
                 _logger.info("Additional SASL providers successfully registered.");
             }
-
         }
         else
         {
             _logger.warn("No additional SASL providers registered.");
         }
 
+        registerManagement();
     }
 
-
-    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception
-    {
-        if (databases.size() > 1)
-        {
-            _logger.warn("More than one principle database provided currently authentication mechanism will override each other.");
-        }
-
-        for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet())
-        {
-            // fixme As the database now provide the mechanisms they support, they will ...
-            // overwrite each other in the map. There should only be one database per vhost.
-            // But currently we must have authentication before vhost definition.
-            initialiseAuthenticationMechanisms(providerMap, entry.getValue());
-        }
-    }
-
-    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception
+    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) 
     {
         if (database == null || database.getMechanisms().size() == 0)
         {
@@ -152,7 +227,6 @@ public class PrincipalDatabaseAuthentica
 
     private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser,
                                                    Map<String, Class<? extends SaslServerFactory>> providerMap)
-            throws Exception
     {
         if (_mechanisms == null)
         {
@@ -173,43 +247,37 @@ public class PrincipalDatabaseAuthentica
         _logger.info("Initialised " + mechanism + " SASL provider successfully");
     }
 
+    /**
+     * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin)
+     */
+    public void configure(final ConfigurationPlugin config) throws ConfigurationException
+    {
+        final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config;
+        final String pdClazz = pdamConfig.getPrincipalDatabaseClass();
+
+        _logger.info("PrincipalDatabase concrete implementation : " + pdClazz);
+
+        _principalDatabase = createPrincipalDatabaseImpl(pdClazz);
+
+        configPrincipalDatabase(_principalDatabase, pdamConfig);        
+    }
+
     public String getMechanisms()
     {
-        if (_default != null)
-        {
-            // Use the default AuthenticationManager if present
-            return _default.getMechanisms();
-        }
-        else
-        {
-            return _mechanisms;
-        }
+        return _mechanisms;
     }
 
     public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
     {
-        if (_default != null)
-        {
-            // Use the default AuthenticationManager if present
-            return _default.createSaslServer(mechanism, localFQDN);
-        }
-        else
-        {
-            return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
-                                         _callbackHandlerMap.get(mechanism));
-        }
-
+        return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
+                                     _callbackHandlerMap.get(mechanism));
     }
 
+    /**
+     * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[])
+     */
     public AuthenticationResult authenticate(SaslServer server, byte[] response)
     {
-        // Use the default AuthenticationManager if present
-        if (_default != null)
-        {
-            return _default.authenticate(server, response);
-        }
-
-
         try
         {
             // Process response from the client
@@ -217,7 +285,9 @@ public class PrincipalDatabaseAuthentica
 
             if (server.isComplete())
             {
-                return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS);
+                final Subject subject = new Subject();
+                subject.getPrincipals().add(new UsernamePrincipal(server.getAuthorizationID()));
+                return new AuthenticationResult(subject);
             }
             else
             {
@@ -230,13 +300,164 @@ public class PrincipalDatabaseAuthentica
         }
     }
 
+    /**
+     * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
+     */
+    public AuthenticationResult authenticate(final String username, final String password)
+    {
+        try
+        {
+            if (_principalDatabase.verifyPassword(username, password.toCharArray()))
+            {
+                final Subject subject = new Subject();
+                subject.getPrincipals().add(new UsernamePrincipal(username));
+                return new AuthenticationResult(subject);
+            }
+            else
+            {
+                return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+            }
+        }
+        catch (AccountNotFoundException e)
+        {
+            return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+        }
+    }
+
     public void close()
     {
+        _mechanisms = null;
         Security.removeProvider(PROVIDER_NAME);
+
+        unregisterManagement();
     }
 
-    public CallbackHandler getHandler(String mechanism)
+    private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
     {
-        return _callbackHandlerMap.get(mechanism);
+        try
+        {
+            return (PrincipalDatabase) Class.forName(pdClazz).newInstance();
+        }
+        catch (InstantiationException ie)
+        {
+            throw new ConfigurationException("Cannot instantiate " + pdClazz, ie);
+        }
+        catch (IllegalAccessException iae)
+        {
+            throw new ConfigurationException("Cannot access " + pdClazz, iae);
+        }
+        catch (ClassNotFoundException cnfe)
+        {
+            throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe);
+        }
+        catch (ClassCastException cce)
+        {
+            throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce);
+        }
+    }
+
+    private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config)
+            throws ConfigurationException
+    {
+
+        final Map<String,String> attributes = config.getPdClassAttributeMap();
+
+        for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();)
+        {
+            final Entry<String, String> nameValuePair = iterator.next();
+            final String methodName = generateSetterName(nameValuePair.getKey());
+            final Method method;
+            try
+            {
+                method = principalDatabase.getClass().getMethod(methodName, String.class);
+            }
+            catch (Exception e)
+            {
+                throw new ConfigurationException("No method " + methodName + " found in class "
+                        + principalDatabase.getClass()
+                        + " hence unable to configure principal database. The method must be public and "
+                        + "have a single String argument with a void return type", e);
+            }
+            try
+            {
+                method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue()));
+            }
+            catch (IllegalArgumentException e)
+            {
+                throw new ConfigurationException(e.getMessage(), e);
+            }
+            catch (PropertyException e)
+            {
+                throw new ConfigurationException(e.getMessage(), e);
+            }
+            catch (IllegalAccessException e)
+            {
+                throw new ConfigurationException(e.getMessage(), e);
+            }
+            catch (InvocationTargetException e)
+            {
+                // QPID-1347..  InvocationTargetException wraps the checked exception thrown from the reflective
+                // method call.  Pull out the underlying message and cause to make these more apparent to the user.
+                throw new ConfigurationException(e.getCause().getMessage(), e.getCause());
+            }
+        }
+    }
+
+    private String generateSetterName(String argName) throws ConfigurationException
+    {
+        if ((argName == null) || (argName.length() == 0))
+        {
+            throw new ConfigurationException("Argument names must have length >= 1 character");
+        }
+
+        if (Character.isLowerCase(argName.charAt(0)))
+        {
+            argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
+        }
+
+        final String methodName = "set" + argName;
+        return methodName;
+    }
+
+    protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase)
+    {
+        _principalDatabase = principalDatabase;
+    }
+
+    protected void registerManagement()
+    {
+        try
+        {
+            _logger.info("Registering UserManagementMBean");
+
+            _mbean = new AMQUserManagementMBean();
+            _mbean.setPrincipalDatabase(_principalDatabase);
+            _mbean.register();
+        }
+        catch (Exception e)
+        {
+            _logger.warn("User management disabled as unable to create MBean:", e);
+            _mbean = null;
+        }
+    }
+
+    protected void unregisterManagement()
+    {
+        try
+        {
+            if (_mbean != null)
+            {
+                _logger.info("Unregistering UserManagementMBean");
+                _mbean.unregister();
+            }
+        }
+        catch (Exception e)
+        {
+            _logger.warn("Failed to unregister User management MBean:", e);
+        }
+        finally
+        {
+            _mbean = null;
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java Sun Aug 14 20:08:12 2011
@@ -45,9 +45,10 @@ public class AmqPlainSaslServerFactory i
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE)))
+        if (props != null &&
+            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+             props.containsKey(Sasl.POLICY_NOACTIVE)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Sun Aug 14 20:08:12 2011
@@ -47,10 +47,11 @@ public class AnonymousSaslServerFactory 
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE) ||
-            props.containsKey(Sasl.POLICY_NOANONYMOUS)))
+        if (props != null &&
+            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+             props.containsKey(Sasl.POLICY_NOACTIVE) ||
+             props.containsKey(Sasl.POLICY_NOANONYMOUS)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java Sun Aug 14 20:08:12 2011
@@ -45,9 +45,10 @@ public class PlainSaslServerFactory impl
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE)))
+        if (props != null &&
+            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+             props.containsKey(Sasl.POLICY_NOACTIVE)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org