You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 19:15:08 UTC

svn commit: r1157566 [22/23] - in /qpid: branches/rg-amqp-1-0-sandbox/qpid/java/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml Sun Aug 14 17:14:51 2011
@@ -26,7 +26,7 @@
   <findSubProjects name="broker-plugins" dir="broker-plugins"/>
   <findSubProjects name="management" dir="management" excludes="common,example,tools/qpid-cli"/>
 
-  <property name="modules.core"       value="junit-toolkit common management/common broker client tools"/>
+  <property name="modules.core"       value="junit-toolkit common management/common amqp-1-0-common broker client amqp-1-0-client tools"/>
   <property name="modules.examples"   value="client/example management/example"/>
   <property name="modules.tests"      value="systests perftests integrationtests testkit"/>
   <property name="modules.management" value="${management}"/>

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java Sun Aug 14 17:14:51 2011
@@ -233,4 +233,6 @@ public abstract class Method extends Str
         return str.toString();
     }
 
+
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Sun Aug 14 17:14:51 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.subscription.Subscription;
@@ -65,7 +66,7 @@ public class SubscriptionTestHelper impl
 
     public void setNoLocal(boolean noLocal)
     {
-        
+
     }
 
     public void send(QueueEntry msg)
@@ -168,11 +169,6 @@ public class SubscriptionTestHelper impl
         return false;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void confirmAutoClose()
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
     public void set(String key, Object value)
     {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -222,11 +218,6 @@ public class SubscriptionTestHelper impl
         return true;
     }
 
-    public boolean isAutoClose()
-    {
-        return false;
-    }
-
     public Queue<QueueEntry> getPreDeliveryQueue()
     {
         return null;
@@ -286,9 +277,14 @@ public class SubscriptionTestHelper impl
     {
         return key.toString();
     }
-    
+
     public boolean isSessionTransactional()
     {
         return false;
     }
+
+    public void queueEmpty() throws AMQException
+    {
+        //TODO
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Sun Aug 14 17:14:51 2011
@@ -20,6 +20,18 @@
  */
 package org.apache.qpid.server;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.logging.*;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
@@ -28,9 +40,28 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.Broker.InitException;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.xml.QpidLog4JConfigurator;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
+import org.apache.qpid.server.information.management.ServerInformationMBean;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.management.LoggingManagementMBean;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.transport.QpidAcceptor;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 
 /**
  * Main entry point for AMQPD.
@@ -38,41 +69,39 @@ import org.apache.qpid.server.registry.A
  */
 public class Main
 {
-    private final Options options = new Options();
-    private CommandLine commandLine;
+    private static Logger _logger;
 
-    public static void main(String[] args)
+    private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+
+    public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+    public static final String QPID_HOME = "QPID_HOME";
+    private static final int IPV4_ADDRESS_LENGTH = 4;
+
+    private static final char IPV4_LITERAL_SEPARATOR = '.';
+    private java.util.logging.Logger FRAME_LOGGER;
+    private java.util.logging.Logger RAW_LOGGER;
+
+    protected static class InitException extends Exception
     {
-        //if the -Dlog4j.configuration property has not been set, enable the init override
-        //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
-        //finds from the classpath when we get the first Loggers
-        if(System.getProperty("log4j.configuration") == null)
+        InitException(String msg, Throwable cause)
         {
-            System.setProperty("log4j.defaultInitOverride", "true");
+            super(msg, cause);
         }
-
-        new Main(args);
     }
 
-    public Main(final String[] args)
+    protected final Options options = new Options();
+    protected CommandLine commandLine;
+
+    protected Main(String[] args)
     {
         setOptions(options);
         if (parseCommandline(args))
         {
-            try
-            {
-                execute();
-            }
-            catch(Exception e)
-            {
-                System.err.println("Exception during startup: " + e);
-                e.printStackTrace();
-                shutdown(1);
-            }
+            execute();
         }
     }
 
-    protected boolean parseCommandline(final String[] args)
+    protected boolean parseCommandline(String[] args)
     {
         try
         {
@@ -90,7 +119,8 @@ public class Main
         }
     }
 
-    protected void setOptions(final Options options)
+    @SuppressWarnings("static-access")
+    protected void setOptions(Options options)
     {
         Option help = new Option("h", "help", false, "print this message");
         Option version = new Option("v", "version", false, "print the version information and exit");
@@ -134,21 +164,16 @@ public class Main
         Option bind =
                 OptionBuilder.withArgName("bind").hasArg()
                         .withDescription("bind to the specified address. Overrides any value in the config file")
-                        .withLongOpt("bind").create(BrokerOptions.BIND);
+                        .withLongOpt("bind").create("b");
         Option logconfig =
                 OptionBuilder.withArgName("logconfig").hasArg()
                         .withDescription("use the specified log4j xml configuration file. By "
-                                         + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE
-                                         + " in the same directory as the configuration file").withLongOpt("logconfig").create(BrokerOptions.LOG_CONFIG);
+                                         + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
+                                         + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
         Option logwatchconfig =
                 OptionBuilder.withArgName("logwatch").hasArg()
                         .withDescription("monitor the log file configuration file for changes. Units are seconds. "
-                                         + "Zero means do not check for changes.").withLongOpt("logwatch").create(BrokerOptions.WATCH);
-
-        Option sslport =
-                OptionBuilder.withArgName("sslport").hasArg()
-                        .withDescription("SSL port. Overrides any value in the config file")
-                        .withLongOpt("sslport").create(BrokerOptions.SSL_PORTS);
+                                         + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
 
         options.addOption(help);
         options.addOption(version);
@@ -162,120 +187,472 @@ public class Main
         options.addOption(exclude0_8);
         options.addOption(mport);
         options.addOption(bind);
-        options.addOption(sslport);
     }
 
-    protected void execute() throws Exception
+    protected void execute()
     {
-        BrokerOptions options = new BrokerOptions();
-        String configFile = commandLine.getOptionValue(BrokerOptions.CONFIG);
-        if(configFile != null)
+        // note this understands either --help or -h. If an option only has a long name you can use that but if
+        // an option has a short name and a long name you must use the short name here.
+        if (commandLine.hasOption("h"))
         {
-            options.setConfigFile(configFile);
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("Qpid", options, true);
         }
+        else if (commandLine.hasOption("v"))
+        {
+            String ver = QpidProperties.getVersionString();
+
+            StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
+
+            boolean first = true;
+            for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
+            {
+                if (first)
+                {
+                    first = false;
+                }
+                else
+                {
+                    protocol.append(", ");
+                }
+
+                protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
+
+            }
 
-        String logWatchConfig = commandLine.getOptionValue(BrokerOptions.WATCH);
-        if(logWatchConfig != null)
+            System.out.println(ver + " (" + protocol + ")");
+        }
+        else
         {
-            options.setLogWatchFrequency(Integer.parseInt(logWatchConfig));
+            try
+            {
+                CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
+                startup();
+                CurrentActor.remove();
+            }
+            catch (InitException e)
+            {
+                System.out.println("Initialisation Error : " + e.getMessage());
+                shutdown(1);
+            }
+            catch (Throwable e)
+            {
+                System.out.println("Error initialising message broker: " + e);
+                e.printStackTrace();
+                shutdown(1);
+            }
         }
+    }
+
+    protected void shutdown(int status)
+    {
+        ApplicationRegistry.removeAll();
+        System.exit(status);
+    }
+
+    protected void startup() throws Exception
+    {
+
+        FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log");
+        RAW_LOGGER = updateLogger("RAW", "qpid-raw.log");
+
+        final String QpidHome = System.getProperty(QPID_HOME);
+        final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
+        final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
+        if (!configFile.exists())
+        {
+            String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
 
-        String logConfig = commandLine.getOptionValue(BrokerOptions.LOG_CONFIG);
-        if(logConfig != null)
+            if (QpidHome == null)
+            {
+                error = error + "\nNote: " + QPID_HOME + " is not set.";
+            }
+
+            throw new InitException(error, null);
+        }
+        else
         {
-            options.setLogConfigFile(logConfig);
+            CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
         }
 
-        String jmxPort = commandLine.getOptionValue(BrokerOptions.MANAGEMENT);
-        if(jmxPort != null)
+        String logConfig = commandLine.getOptionValue("l");
+        String logWatchConfig = commandLine.getOptionValue("w", "0");
+
+        int logWatchTime = 0;
+        try
+        {
+            logWatchTime = Integer.parseInt(logWatchConfig);
+        }
+        catch (NumberFormatException e)
         {
-            options.setJmxPort(Integer.parseInt(jmxPort));
+            System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+                               + "a non-negative integer. Using default of zero (no watching configured");
         }
 
-        String bindAddr = commandLine.getOptionValue(BrokerOptions.BIND);
-        if (bindAddr != null)
+        File logConfigFile;
+        if (logConfig != null)
+        {
+            logConfigFile = new File(logConfig);
+            configureLogging(logConfigFile, logWatchTime);
+        }
+        else
         {
-            options.setBind(bindAddr);
+            File configFileDirectory = configFile.getParentFile();
+            logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
+            configureLogging(logConfigFile, logWatchTime);
         }
 
-        String[] portStr = commandLine.getOptionValues(BrokerOptions.PORTS);
-        if(portStr != null)
+        ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
+        ServerConfiguration serverConfig = config.getConfiguration();
+        updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
+
+        ApplicationRegistry.initialise(config);
+
+        // We have already loaded the BrokerMessages class by this point so we
+        // need to refresh the locale setting incase we had a different value in
+        // the configuration.
+        BrokerMessages.reload();
+
+        // AR.initialise() sets and removes its own actor so we now need to set the actor
+        // for the remainder of the startup, and the default actor if the stack is empty
+        CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
+        CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
+        GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
+        
+
+        try
         {
-            parsePortArray(options, portStr, false);
-            for(ProtocolExclusion pe : ProtocolExclusion.values())
+            configureLoggingManagementMBean(logConfigFile, logWatchTime);
+
+            ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
+            configMBean.register();
+
+            ServerInformationMBean sysInfoMBean =
+                    new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
+            sysInfoMBean.register();
+
+
+            String[] portStr = commandLine.getOptionValues("p");
+
+            Set<Integer> ports = new HashSet<Integer>();
+            Set<Integer> exclude_0_10 = new HashSet<Integer>();
+            Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
+            Set<Integer> exclude_0_9 = new HashSet<Integer>();
+            Set<Integer> exclude_0_8 = new HashSet<Integer>();
+
+            if(portStr == null || portStr.length == 0)
             {
-                parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
+
+                parsePortList(ports, serverConfig.getPorts());
+                parsePortList(exclude_0_10, serverConfig.getPortExclude010());
+                parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
+                parsePortList(exclude_0_9, serverConfig.getPortExclude09());
+                parsePortList(exclude_0_8, serverConfig.getPortExclude08());
+
             }
-        }
+            else
+            {
+                parsePortArray(ports, portStr);
+                parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
+                parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1"));
+                parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
+                parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
 
-        String[] sslPortStr = commandLine.getOptionValues(BrokerOptions.SSL_PORTS);
-        if(sslPortStr != null)
-        {
-            parsePortArray(options, sslPortStr, true);
-            for(ProtocolExclusion pe : ProtocolExclusion.values())
+            }
+
+
+
+
+            String bindAddr = commandLine.getOptionValue("b");
+            if (bindAddr == null)
+            {
+                bindAddr = serverConfig.getBind();
+            }
+            InetAddress bindAddress = null;
+
+
+
+            if (bindAddr.equals("wildcard"))
             {
-                parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
+                bindAddress = new InetSocketAddress(0).getAddress();
             }
+            else
+            {
+                bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+            }
+
+            String hostName = bindAddress.getCanonicalHostName();
+
+
+            String keystorePath = serverConfig.getKeystorePath();
+            String keystorePassword = serverConfig.getKeystorePassword();
+            String certType = serverConfig.getCertType();
+            SSLContextFactory sslFactory = null;
+
+            if (!serverConfig.getSSLOnly())
+            {
+
+                for(int port : ports)
+                {
+
+                    NetworkDriver driver = new MINANetworkDriver();
+
+                    Set<VERSION> supported = EnumSet.allOf(VERSION.class);
+
+                    if(exclude_0_10.contains(port))
+                    {
+                        supported.remove(VERSION.v0_10);
+                    }
+
+                    if(exclude_0_9_1.contains(port))
+                    {
+                        supported.remove(VERSION.v0_9_1);
+                    }
+                    if(exclude_0_9.contains(port))
+                    {
+                        supported.remove(VERSION.v0_9);
+                    }
+                    if(exclude_0_8.contains(port))
+                    {
+                        supported.remove(VERSION.v0_8);
+                    }
+
+                    MultiVersionProtocolEngineFactory protocolEngineFactory =
+                            new MultiVersionProtocolEngineFactory(hostName, supported);
+
+
+
+                    driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
+                                serverConfig.getNetworkConfiguration(), null);
+                    ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+                                                                  new QpidAcceptor(driver,"TCP"));
+                    CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
+
+                }
+
+            }
+
+            if (serverConfig.getEnableSSL())
+            {
+                sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+                NetworkDriver driver = new MINANetworkDriver();
+                driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+                            new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+                ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()),
+                        new QpidAcceptor(driver,"TCP"));
+                CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort()));
+            }
+
+            CurrentActor.get().message(BrokerMessages.READY());
+
         }
-        
-        startBroker(options);
+        finally
+        {
+            // Startup is complete so remove the AR initialised Startup actor
+            CurrentActor.remove();
+        }
+
+
+
     }
 
-    protected void startBroker(final BrokerOptions options) throws Exception
+    private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException
     {
-        Broker broker = new Broker();
-        broker.startup(options);
+        java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType);
+        logger.setLevel(Level.FINE);
+        Formatter formatter = new Formatter()
+        {
+            @Override
+            public String format(final LogRecord record)
+            {
+
+                return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n";
+            }
+        };
+        for(Handler handler : logger.getHandlers())
+        {
+            logger.removeHandler(handler);
+        }
+        Handler handler = new ConsoleHandler();
+
+        handler.setLevel(Level.FINE);
+        handler.setFormatter(formatter);
+
+        logger.addHandler(handler);
+
+
+        handler = new FileHandler(logFileName, true);
+        handler.setLevel(Level.FINE);
+        handler.setFormatter(formatter);
+
+        logger.addHandler(handler);
+        return logger;
     }
 
-    protected void shutdown(final int status)
+    private void parsePortArray(Set<Integer> ports, String[] portStr)
+            throws InitException
     {
-        ApplicationRegistry.remove();
-        System.exit(status);
+        if(portStr != null)
+        {
+            for(int i = 0; i < portStr.length; i++)
+            {
+                try
+                {
+                    ports.add(Integer.parseInt(portStr[i]));
+                }
+                catch (NumberFormatException e)
+                {
+                    throw new InitException("Invalid port: " + portStr[i], e);
+                }
+            }
+        }
     }
 
-    private static void parsePortArray(final BrokerOptions options,final Object[] ports,
-                                       final boolean ssl) throws InitException
+    private void parsePortList(Set<Integer> output, List input)
+            throws InitException
     {
-        if(ports != null)
+        if(input != null)
         {
-            for(int i = 0; i < ports.length; i++)
+            for(Object port : input)
             {
                 try
                 {
-                    if(ssl)
-                    {
-                        options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
-                    }
-                    else
-                    {
-                        options.addPort(Integer.parseInt(String.valueOf(ports[i])));
-                    }
+                    output.add(Integer.parseInt(String.valueOf(port)));
                 }
                 catch (NumberFormatException e)
                 {
-                    throw new InitException("Invalid port: " + ports[i], e);
+                    throw new InitException("Invalid port: " + port, e);
                 }
             }
         }
     }
 
-    private static void parsePortArray(final BrokerOptions options, final Object[] ports,
-                                       final ProtocolExclusion excludedProtocol) throws InitException
+    /**
+     * Update the configuration data with the management port.
+     * @param configuration
+     * @param managementPort The string from the command line
+     */
+    private void updateManagementPort(ServerConfiguration configuration, String managementPort)
     {
-        if(ports != null)
+        if (managementPort != null)
         {
-            for(int i = 0; i < ports.length; i++)
+            try
+            {
+                configuration.setJMXManagementPort(Integer.parseInt(managementPort));
+            }
+            catch (NumberFormatException e)
             {
+                _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
+            }
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        //if the -Dlog4j.configuration property has not been set, enable the init override
+        //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
+        //finds from the classpath when we get the first Loggers
+        if(System.getProperty("log4j.configuration") == null)
+        {
+            System.setProperty("log4j.defaultInitOverride", "true");
+        }
+
+        //now that the override status is know, we can instantiate the Loggers
+        _logger = Logger.getLogger(Main.class);
+
+        new Main(args);
+    }
+
+    private byte[] parseIP(String address) throws Exception
+    {
+        char[] literalBuffer = address.toCharArray();
+        int byteCount = 0;
+        int currByte = 0;
+        byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
+        for (int i = 0; i < literalBuffer.length; i++)
+        {
+            char currChar = literalBuffer[i];
+            if ((currChar >= '0') && (currChar <= '9'))
+            {
+                currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF);
+            }
+
+            if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length))
+            {
+                ip[byteCount++] = (byte) currByte;
+                currByte = 0;
+            }
+        }
+
+        if (byteCount != 4)
+        {
+            throw new Exception("Invalid IP address: " + address);
+        }
+        return ip;
+    }
+
+    private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
+    {
+        if (logConfigFile.exists() && logConfigFile.canRead())
+        {
+            CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
+
+            if (logWatchTime > 0)
+            {
+                System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+                                   + logWatchTime + " seconds");
+                // log4j expects the watch interval in milliseconds
                 try
                 {
-                    options.addExcludedPort(excludedProtocol, 
-                            Integer.parseInt(String.valueOf(ports[i])));
+                    QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
                 }
-                catch (NumberFormatException e)
+                catch (Exception e)
+                {
+                    throw new InitException(e.getMessage(),e);
+                }
+            }
+            else
+            {
+                try
+                {
+                    QpidLog4JConfigurator.configure(logConfigFile.getPath());
+                }
+                catch (Exception e)
                 {
-                    throw new InitException("Invalid port for exclusion: " + ports[i], e);
+                    throw new InitException(e.getMessage(),e);
                 }
             }
         }
+        else
+        {
+            System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
+            System.err.println("Using the fallback internal log4j.properties configuration");
+
+            InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
+            if(propsFile == null)
+            {
+                throw new IOException("Unable to load the fallback internal log4j.properties configuration file");
+            }
+            else
+            {
+                try
+                {
+                    Properties fallbackProps = new Properties();
+                    fallbackProps.load(propsFile);
+                    PropertyConfigurator.configure(fallbackProps);
+                }
+                finally
+                {
+                    propsFile.close();
+                }
+            }
+        }
+    }
+
+    private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
+    {
+        LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
+
+        blm.register();
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Sun Aug 14 17:14:51 2011
@@ -20,44 +20,56 @@
 */
 package org.apache.qpid.server.protocol;
 
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.network.NetworkConnection;
+
+import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
 
 public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
 {
-    private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
-    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+    ;
+
+
+    public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 };
+
+    private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
 
     private final IApplicationRegistry _appRegistry;
     private final String _fqdn;
-    private final Set<AmqpProtocolVersion> _supported;
+    private final Set<VERSION> _supported;
+
 
     public MultiVersionProtocolEngineFactory()
     {
-        this("localhost", ALL_VERSIONS);
+        this(1, "localhost", ALL_VERSIONS);
+    }
+
+    public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
+    {
+        this(1, fqdn, versions);
     }
 
+
     public MultiVersionProtocolEngineFactory(String fqdn)
     {
-        this(fqdn, ALL_VERSIONS);
+        this(1, fqdn, ALL_VERSIONS);
     }
 
-    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+    public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
     {
-        _appRegistry = ApplicationRegistry.getInstance();
+        _appRegistry = ApplicationRegistry.getInstance(instance);
         _fqdn = fqdn;
         _supported = supportedVersions;
     }
 
-    public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
+
+    public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
     {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
+        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Aug 14 17:14:51 2011
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements A
     /** null means shared */
     private final AMQShortString _owner;
 
-    private AuthorizationHolder _authorizationHolder;
+    private PrincipalHolder _prinicpalHolder;
 
     private boolean _exclusive = false;
     private AMQSessionModel _exclusiveOwner;
@@ -102,7 +102,9 @@ public class SimpleAMQQueue implements A
 
     protected final QueueEntryList _entries;
 
-    protected final SubscriptionList _subscriptionList = new SubscriptionList();
+    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
+
+    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
 
     private volatile Subscription _exclusiveSubscriber;
 
@@ -186,7 +188,7 @@ public class SimpleAMQQueue implements A
     //TODO : persist creation time
     private long _createTime = System.currentTimeMillis();
     private ConfigurationPlugin _queueConfiguration;
-
+    private final boolean _isTopic;
 
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
@@ -232,10 +234,12 @@ public class SimpleAMQQueue implements A
         _exclusive = exclusive;
         _virtualHost = virtualHost;
         _entries = entryListFactory.createQueueEntryList(this);
-        _arguments = arguments;
+        _arguments = arguments == null ? Collections.EMPTY_MAP : arguments;
 
         _id = virtualHost.getConfigStore().createId();
 
+        _isTopic = arguments != null && arguments.containsKey("topic");
+
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
         _logSubject = new QueueLogSubject(this);
@@ -327,7 +331,7 @@ public class SimpleAMQQueue implements A
     {
         return _exclusive;
     }
-    
+
     public void setExclusive(boolean exclusive) throws AMQException
     {
         _exclusive = exclusive;
@@ -371,14 +375,14 @@ public class SimpleAMQQueue implements A
         return _owner;
     }
 
-    public AuthorizationHolder getAuthorizationHolder()
+    public PrincipalHolder getPrincipalHolder()
     {
-        return _authorizationHolder;
+        return _prinicpalHolder;
     }
 
-    public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
+    public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
     {
-        _authorizationHolder = authorizationHolder;
+        _prinicpalHolder = prinicpalHolder;
     }
 
 
@@ -402,8 +406,8 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied");
         }
-        
-        
+
+
         if (hasExclusiveSubscriber())
         {
             throw new ExistingExclusiveSubscription();
@@ -433,14 +437,14 @@ public class SimpleAMQQueue implements A
                 subscription.setNoLocal(_nolocal);
             }
             _subscriptionList.add(subscription);
-            
+
             //Increment consumerCountHigh if necessary. (un)registerSubscription are both
             //synchronized methods so we don't need additional synchronization here
             if(_counsumerCountHigh.get() < getConsumerCount())
             {
                 _counsumerCountHigh.incrementAndGet();
             }
-            
+
             if (isDeleted())
             {
                 subscription.queueDeleted(this);
@@ -486,6 +490,11 @@ public class SimpleAMQQueue implements A
                 // queue. This is because the delete method uses the subscription set which has just been cleared
                 subscription.queueDeleted(this);
             }
+
+            if(_subscriptionList.size() == 0 && _isTopic)
+            {
+                clearQueue();
+            }
         }
 
     }
@@ -512,10 +521,10 @@ public class SimpleAMQQueue implements A
                 break;
             }
         }
-        
+
         reconfigure();
     }
-    
+
     private void reconfigure()
     {
         //Reconfigure the queue for to reflect this new binding.
@@ -541,7 +550,7 @@ public class SimpleAMQQueue implements A
     public void removeBinding(final Binding binding)
     {
         _bindings.remove(binding);
-        
+
         reconfigure();
     }
 
@@ -568,101 +577,104 @@ public class SimpleAMQQueue implements A
 
     public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
     {
-        incrementTxnEnqueueStats(message);
-        incrementQueueCount();
-        incrementQueueSize(message);
         _totalMessagesReceived.incrementAndGet();
 
 
-        QueueEntry entry;
         Subscription exclusiveSub = _exclusiveSubscriber;
-
-        if (exclusiveSub != null)
+        if(!_isTopic || _subscriptionList.size()!=0)
         {
-            exclusiveSub.getSendLock();
+            incrementTxnEnqueueStats(message);
+            incrementQueueCount();
+            incrementQueueSize(message);
 
-            try
-            {
-                entry = _entries.add(message);
+            QueueEntry entry;
 
-                deliverToSubscription(exclusiveSub, entry);
-            }
-            finally
+            if (exclusiveSub != null)
             {
-                exclusiveSub.releaseSendLock();
-            }
-        }
-        else
-        {
-            entry = _entries.add(message);
-            /*
-
-            iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+                exclusiveSub.getSendLock();
 
-             */
-            SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
-            SubscriptionList.SubscriptionNode nextNode = node.findNext();
-            if (nextNode == null)
-            {
-                nextNode = _subscriptionList.getHead().findNext();
-            }
-            while (nextNode != null)
-            {
-                if (_subscriptionList.updateMarkedNode(node, nextNode))
+                try
                 {
-                    break;
+                    entry = _entries.add(message);
+
+                    deliverToSubscription(exclusiveSub, entry);
                 }
-                else
+                finally
                 {
-                    node = _subscriptionList.getMarkedNode();
-                    nextNode = node.findNext();
-                    if (nextNode == null)
-                    {
-                        nextNode = _subscriptionList.getHead().findNext();
-                    }
+                    exclusiveSub.releaseSendLock();
                 }
             }
+            else
+            {
+                entry = _entries.add(message);
+                /*
 
-            // always do one extra loop after we believe we've finished
-            // this catches the case where we *just* miss an update
-            int loops = 2;
+                iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
 
-            while (entry.isAvailable() && loops != 0)
-            {
+                 */
+                SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+                SubscriptionList.SubscriptionNode nextNode = node.getNext();
                 if (nextNode == null)
                 {
-                    loops--;
-                    nextNode = _subscriptionList.getHead();
+                    nextNode = _subscriptionList.getHead().getNext();
                 }
-                else
+                while (nextNode != null)
                 {
-                    // if subscription at end, and active, offer
-                    Subscription sub = nextNode.getSubscription();
-                    deliverToSubscription(sub, entry);
+                    if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+                    {
+                        break;
+                    }
+                    else
+                    {
+                        node = _lastSubscriptionNode.get();
+                        nextNode = node.getNext();
+                        if (nextNode == null)
+                        {
+                            nextNode = _subscriptionList.getHead().getNext();
+                        }
+                    }
                 }
-                nextNode = nextNode.findNext();
 
+                // always do one extra loop after we believe we've finished
+                // this catches the case where we *just* miss an update
+                int loops = 2;
+
+                while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+                {
+                    if (nextNode == null)
+                    {
+                        loops--;
+                        nextNode = _subscriptionList.getHead();
+                    }
+                    else
+                    {
+                        // if subscription at end, and active, offer
+                        Subscription sub = nextNode.getSubscription();
+                        deliverToSubscription(sub, entry);
+                    }
+                    nextNode = nextNode.getNext();
+
+                }
             }
-        }
 
 
-        if (entry.isAvailable())
-        {
-            checkSubscriptionsNotAheadOfDelivery(entry);
+            if (!(entry.isAcquired() || entry.isDeleted()))
+            {
+                checkSubscriptionsNotAheadOfDelivery(entry);
 
-            deliverAsync();
-        }
+                deliverAsync();
+            }
 
-        if(_managedObject != null)
-        {
-            _managedObject.checkForNotification(entry.getMessage());
-        }
+            if(_managedObject != null)
+            {
+                _managedObject.checkForNotification(entry.getMessage());
+            }
 
-        if(action != null)
-        {
-            action.onEnqueue(entry);
+            if(action != null)
+            {
+                action.onEnqueue(entry);
+            }
         }
-
     }
 
     private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -718,20 +730,20 @@ public class SimpleAMQQueue implements A
     {
         getAtomicQueueCount().incrementAndGet();
     }
-    
+
     private void incrementTxnEnqueueStats(final ServerMessage message)
     {
         SessionConfig session = message.getSessionConfig();
-        
+
         if(session !=null && session.isTransactional())
         {
             _msgTxnEnqueues.incrementAndGet();
             _byteTxnEnqueues.addAndGet(message.getSize());
         }
     }
-    
+
     private void incrementTxnDequeueStats(QueueEntry entry)
-    {      
+    {
         _msgTxnDequeues.incrementAndGet();
         _byteTxnDequeues.addAndGet(entry.getSize());
     }
@@ -745,6 +757,40 @@ public class SimpleAMQQueue implements A
         incrementUnackedMsgCount();
 
         sub.send(entry);
+
+        if(_isTopic)
+        {
+            if(allSubscriptionsAhead(entry) && entry.acquire())
+            {
+                entry.discard();
+            }
+        }
+    }
+
+    private boolean allSubscriptionsAhead(final QueueEntry entry)
+    {
+        SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+        while(subIter.advance() && !entry.isAcquired())
+        {
+            final Subscription subscription = subIter.getNode().getSubscription();
+            if(!subscription.isClosed())
+            {
+                QueueContext context = (QueueContext) subscription.getQueueContext();
+                if(context != null)
+                {
+                    QueueEntry subnode = context._lastSeenEntry;
+                    if(subnode.compareTo(entry)<0)
+                    {
+                        return false;
+                    }
+                }
+                else
+                {
+                    return false;
+                }
+            }
+        }
+        return true;
     }
 
     private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
@@ -803,6 +849,24 @@ public class SimpleAMQQueue implements A
 
     }
 
+    public void requeue(QueueEntryImpl entry, Subscription subscription)
+    {
+        SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+        // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+        while (subscriberIter.advance())
+        {
+            Subscription sub = subscriberIter.getNode().getSubscription();
+
+            // we don't make browsers send the same stuff twice
+            if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
+            {
+                updateSubRequeueEntry(sub, entry);
+            }
+        }
+
+        deliverAsync();
+    }
+
     public void dequeue(QueueEntry entry, Subscription sub)
     {
         decrementQueueCount();
@@ -811,7 +875,7 @@ public class SimpleAMQQueue implements A
         {
             _deliveredMessages.decrementAndGet();
         }
-        
+
         if(sub != null && sub.isSessionTransactional())
         {
             incrementTxnDequeueStats(entry);
@@ -868,7 +932,7 @@ public class SimpleAMQQueue implements A
     {
         return _subscriptionList.size();
     }
-    
+
     public int getConsumerCountHigh()
     {
         return _counsumerCountHigh.get();
@@ -940,7 +1004,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (node != null && !node.isDispensed())
+            if (node != null && !node.isDeleted())
             {
                 entryList.add(node);
             }
@@ -1044,7 +1108,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance() && !filter.filterComplete())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDispensed() && filter.accept(node))
+            if (!node.isDeleted() && filter.accept(node))
             {
                 entryList.add(node);
             }
@@ -1238,6 +1302,7 @@ public class SimpleAMQQueue implements A
 
                 if ((messageId >= fromMessageId)
                     && (messageId <= toMessageId)
+                    && !node.isDeleted()
                     && node.acquire())
                 {
                     dequeueEntry(node);
@@ -1267,7 +1332,7 @@ public class SimpleAMQQueue implements A
         while (noDeletes && queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (node.acquire())
+            if (!node.isDeleted() && node.acquire())
             {
                 dequeueEntry(node);
                 noDeletes = false;
@@ -1277,7 +1342,7 @@ public class SimpleAMQQueue implements A
     }
 
     public long clearQueue() throws AMQException
-    {         
+    {
         return clear(0l);
     }
 
@@ -1288,7 +1353,7 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied: queue " + getName());
         }
-        
+
         QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
@@ -1297,7 +1362,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (node.acquire())
+            if (!node.isDeleted() && node.acquire())
             {
                 dequeueEntry(node, txn);
                 if(++count == request)
@@ -1355,7 +1420,7 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied: " + getName());
         }
-        
+
         if (!_deleted.getAndSet(true))
         {
 
@@ -1564,7 +1629,7 @@ public class SimpleAMQQueue implements A
 
     public void deliverAsync()
     {
-        QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
+        Runner runner = new Runner(_stateChangeCount.incrementAndGet());
 
         if (_asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1583,6 +1648,52 @@ public class SimpleAMQQueue implements A
         _asyncDelivery.execute(flusher);
     }
 
+
+    private class Runner implements ReadWriteRunnable
+    {
+        String _name;
+        public Runner(long count)
+        {
+            _name = "QueueRunner-" + count + "-" + _logActor;
+        }
+
+        public void run()
+        {
+            String originalName = Thread.currentThread().getName();
+            try
+            {
+                Thread.currentThread().setName(_name);
+                CurrentActor.set(_logActor);
+
+                processQueue(this);
+            }
+            catch (AMQException e)
+            {
+                _logger.error(e);
+            }
+            finally
+            {
+                CurrentActor.remove();
+                Thread.currentThread().setName(originalName);
+            }
+        }
+
+        public boolean isRead()
+        {
+            return false;
+        }
+
+        public boolean isWrite()
+        {
+            return true;
+        }
+
+        public String toString()
+        {
+            return _name;
+        }
+    }
+
     public void flushSubscription(Subscription sub) throws AMQException
     {
         // Access control
@@ -1603,12 +1714,9 @@ public class SimpleAMQQueue implements A
             {
                 sub.getSendLock();
                 atTail = attemptDelivery(sub);
-                if (atTail && sub.isAutoClose())
+                if (atTail && getNextAvailableEntry(sub) == null)
                 {
-                    unregisterSubscription(sub);
-
-                    sub.confirmAutoClose();
-
+                    sub.queueEmpty();
                 }
                 else if (!atTail)
                 {
@@ -1629,6 +1737,7 @@ public class SimpleAMQQueue implements A
         {
             advanceAllSubscriptions();
         }
+
         return atTail;
     }
 
@@ -1651,7 +1760,7 @@ public class SimpleAMQQueue implements A
 
             QueueEntry node  = getNextAvailableEntry(sub);
 
-            if (node != null && node.isAvailable())
+            if (node != null && !(node.isAcquired() || node.isDeleted()))
             {
                 if (sub.hasInterest(node))
                 {
@@ -1712,7 +1821,7 @@ public class SimpleAMQQueue implements A
             QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
-            while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+            while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
             {
                 if (expired)
                 {
@@ -1741,40 +1850,14 @@ public class SimpleAMQQueue implements A
     }
 
 
-    /**
-     * Used by queue Runners to asynchronously deliver messages to consumers.
-     *
-     * A queue Runner is started whenever a state change occurs, e.g when a new
-     * message arrives on the queue and cannot be immediately delivered to a
-     * subscription (i.e. asynchronous delivery is required). Unless there are
-     * SubFlushRunners operating (due to subscriptions unsuspending) which are
-     * capable of accepting/delivering all messages then these messages would
-     * otherwise remain on the queue.
-     *
-     * processQueue should be running while there are messages on the queue AND
-     * there are subscriptions that can deliver them. If there are no
-     * subscriptions capable of delivering the remaining messages on the queue
-     * then processQueue should stop to prevent spinning.
-     *
-     * Since processQueue is runs in a fixed size Executor, it should not run
-     * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
-     * incoming messages may not be able to be scheduled in the thread pool
-     * because all threads are working on clearing down large queues). To solve
-     * this problem, after an arbitrary number of message deliveries the
-     * processQueue job stops iterating, resubmits itself to the executor, and
-     * ends the current instance
-     *
-     * @param runner the Runner to schedule
-     * @throws AMQException
-     */
-    public void processQueue(QueueRunner runner) throws AMQException
+    private void processQueue(Runnable runner) throws AMQException
     {
         long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
 
-        boolean lastLoop = false;
-        int iterations = MAX_ASYNC_DELIVERIES;
+        int extraLoops = 1;
+        long iterations = MAX_ASYNC_DELIVERIES;
 
         _asynchronousRunner.compareAndSet(runner, null);
 
@@ -1791,14 +1874,12 @@ public class SimpleAMQQueue implements A
 
             if (previousStateChangeCount != stateChangeCount)
             {
-                //further asynchronous delivery is required since the
-                //previous loop. keep going if iteration slicing allows.
-                lastLoop = false;
+                extraLoops = 1;
             }
 
             previousStateChangeCount = stateChangeCount;
-            boolean allSubscriptionsDone = true;
-            boolean subscriptionDone;
+            deliveryIncomplete = _subscriptionList.size() != 0;
+            boolean done;
 
             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
             //iterate over the subscribers and try to advance their pointer
@@ -1808,25 +1889,29 @@ public class SimpleAMQQueue implements A
                 sub.getSendLock();
                 try
                 {
-                    //attempt delivery. returns true if no further delivery currently possible to this sub
-                    subscriptionDone = attemptDelivery(sub);
-                    if (subscriptionDone)
+
+                    done = attemptDelivery(sub);
+
+                    if (done)
                     {
-                        //close autoClose subscriptions if we are not currently intent on continuing
-                        if (lastLoop && sub.isAutoClose())
+                        if (extraLoops == 0)
                         {
-                            unregisterSubscription(sub);
+                            if(getNextAvailableEntry(sub) == null)
+                            {
+                                sub.queueEmpty();
+                            }
+                            deliveryIncomplete = false;
 
-                            sub.confirmAutoClose();
+                        }
+                        else
+                        {
+                            extraLoops--;
                         }
                     }
                     else
                     {
-                        //this subscription can accept additional deliveries, so we must 
-                        //keep going after this (if iteration slicing allows it)
-                        allSubscriptionsDone = false;
-                        lastLoop = false;
                         iterations--;
+                        extraLoops = 1;
                     }
                 }
                 finally
@@ -1834,34 +1919,10 @@ public class SimpleAMQQueue implements A
                     sub.releaseSendLock();
                 }
             }
-
-            if(allSubscriptionsDone && lastLoop)
-            {
-                //We have done an extra loop already and there are again
-                //again no further delivery attempts possible, only
-                //keep going if state change demands it.
-                deliveryIncomplete = false;
-            }
-            else if(allSubscriptionsDone)
-            {
-                //All subscriptions reported being done, but we have to do
-                //an extra loop if the iterations are not exhausted and
-                //there is still any work to be done
-                deliveryIncomplete = _subscriptionList.size() != 0;
-                lastLoop = true;
-            }
-            else
-            {
-                //some subscriptions can still accept more messages,
-                //keep going if iteration count allows.
-                lastLoop = false;
-                deliveryIncomplete = true;
-            }
-
             _asynchronousRunner.set(null);
         }
 
-        // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
+        // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
         if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1881,8 +1942,8 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            // Only process nodes that are not currently deleted and not dequeued
-            if (!node.isDispensed())
+            // Only process nodes that are not currently deleted
+            if (!node.isDeleted())
             {
                 // If the node has exired then aquire it
                 if (node.expired() && node.acquire())
@@ -2146,22 +2207,22 @@ public class SimpleAMQQueue implements A
     {
         return _dequeueSize.get();
     }
-    
+
     public long getByteTxnEnqueues()
     {
         return _byteTxnEnqueues.get();
     }
-    
+
     public long getByteTxnDequeues()
     {
         return _byteTxnDequeues.get();
     }
-    
+
     public long getMsgTxnEnqueues()
     {
         return _msgTxnEnqueues.get();
     }
-    
+
     public long getMsgTxnDequeues()
     {
         return _msgTxnDequeues.get();
@@ -2198,21 +2259,21 @@ public class SimpleAMQQueue implements A
     {
         return _unackedMsgCountHigh.get();
     }
-    
+
     public long getUnackedMessageCount()
     {
         return _unackedMsgCount.get();
     }
-    
+
     public void decrementUnackedMsgCount()
     {
         _unackedMsgCount.decrementAndGet();
     }
-    
+
     private void incrementUnackedMsgCount()
     {
         long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-        
+
         long unackedMsgCountHigh;
         while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
         {
@@ -2222,9 +2283,4 @@ public class SimpleAMQQueue implements A
             }
         }
     }
-
-    public LogActor getLogActor()
-    {
-        return _logActor;
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Sun Aug 14 17:14:51 2011
@@ -20,73 +20,20 @@
  */
 package org.apache.qpid.server.security.auth.manager;
 
-import javax.security.auth.Subject;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource;
 import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 
-/**
- * Implementations of the AuthenticationManager are responsible for determining
- * the authenticity of a user's credentials.
- * 
- * If the authentication is successful, the manager is responsible for producing a populated
- * {@link Subject} containing the user's identity and zero or more principals representing
- * groups to which the user belongs.
- * <p>
- * The {@link #initialise()} method is responsible for registering SASL mechanisms required by
- * the manager.  The {@link #close()} method must reverse this registration.
- * 
- */
-public interface AuthenticationManager extends Closeable, Plugin
+public interface AuthenticationManager extends Closeable, CallbackHanderSource
 {
-    /** The name for the required SASL Server mechanisms */
-    public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
-
-    /**
-     * Initialise the authentication plugin.
-     *
-     */
-    void initialise();
-
-   /**
-    * Gets the SASL mechanisms known to this manager.
-    *
-    * @return SASL mechanism names, space separated.
-    */
     String getMechanisms();
 
-    /**
-     * Creates a SASL server for the specified mechanism name for the given
-     * fully qualified domain name.
-     *
-     * @param mechanism mechanism name
-     * @param localFQDN domain name
-     *
-     * @return SASL server
-     * @throws SaslException
-     */
     SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
 
-    /**
-     * Authenticates a user using SASL negotiation.
-     *
-     * @param server SASL server
-     * @param response SASL response to process
-     *
-     * @return authentication result
-     */
     AuthenticationResult authenticate(SaslServer server, byte[] response);
 
-    /**
-     * Authenticates a user using their username and password.
-     *
-     * @param username username
-     * @param password password
-     *
-     * @return authentication result
-     */
-    AuthenticationResult authenticate(String username, String password);
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Sun Aug 14 17:14:51 2011
@@ -20,65 +20,27 @@
  */
 package org.apache.qpid.server.security.auth.manager;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.Security;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.AccountNotFoundException;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslServerFactory;
-
+import org.apache.log4j.Logger;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.configuration.PropertyException;
-import org.apache.qpid.configuration.PropertyUtils;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
-import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
 import org.apache.qpid.server.security.auth.sasl.JCAProvider;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
 
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslServerFactory;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.security.Security;
 
-/**
- * Concrete implementation of the AuthenticationManager that determines if supplied
- * user credentials match those appearing in a PrincipalDatabase.   The implementation
- * of the PrincipalDatabase is determined from the configuration.
- * 
- * This implementation also registers the JMX UserManagemement MBean.
- * 
- * This plugin expects configuration such as:
- *
- * <pre>
- * &lt;pd-auth-manager&gt;
- *   &lt;principal-database&gt;
- *      &lt;class&gt;org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase&lt;/class&gt;
- *      &lt;attributes&gt;
- *         &lt;attribute&gt;
- *              &lt;name>passwordFile&lt;/name&gt;
- *              &lt;value>${conf}/passwd&lt;/value&gt;
- *          &lt;/attribute&gt;
- *      &lt;/attributes&gt;
- *   &lt;/principal-database&gt;
- * &lt;/pd-auth-manager&gt;
- * </pre>
- */
 public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager
 {
     private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class);
@@ -87,109 +49,55 @@ public class PrincipalDatabaseAuthentica
     private String _mechanisms;
 
     /** Maps from the mechanism to the callback handler to use for handling those requests */
-    private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+    private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
 
     /**
      * Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for
      * details of the use of these properties. This map is populated during initialisation of each provider.
      */
-    private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
-
-    protected PrincipalDatabase _principalDatabase = null;
+    private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
 
-    protected AMQUserManagementMBean _mbean = null;
+    private AuthenticationManager _default = null;
+    /** The name for the required SASL Server mechanisms */
+    public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
 
-    public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
+    public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception
     {
-        public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
-        {
-            final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
+        _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'")
+                     + " PrincipalDatabase authentication manager.");
 
-            // If there is no configuration for this plugin then don't load it.
-            if (configuration == null)
-            {
-                _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager");
-                return null;
-            }
+        // Fixme This should be done per Vhost but allowing global hack isn't right but ...
+        // required as authentication is done before Vhost selection
 
-            final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager();
-            pdam.configure(configuration);
-            pdam.initialise();
-            return pdam;
-        }
+        Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
 
-        public Class<PrincipalDatabaseAuthenticationManager> getPluginClass()
-        {
-            return PrincipalDatabaseAuthenticationManager.class;
-        }
 
-        public String getPluginName()
+        if (name == null || hostConfig == null)
         {
-            return PrincipalDatabaseAuthenticationManager.class.getName();
+            initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
         }
-    };
-
-    public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
- 
-        public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
+        else
         {
-            public List<String> getParentPaths()
-            {
-                return Arrays.asList("security.pd-auth-manager");
-            }
+            String databaseName = hostConfig.getAuthenticationDatabase();
 
-            public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
+            if (databaseName == null)
             {
-                final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
-                
-                instance.setConfiguration(path, config);
-                return instance;
-            }
-        };
-
-        public String[] getElementsProcessed()
-        {
-            return new String[] {"principal-database.class",
-                                 "principal-database.attributes.attribute.name",
-                                 "principal-database.attributes.attribute.value"};
-        }
-
-        public void validateConfiguration() throws ConfigurationException
-        {
-        }
-  
-        public String getPrincipalDatabaseClass()
-        {
-            return _configuration.getString("principal-database.class");
-        }
-  
-        public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
-        {
-            final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name");
-            final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value");
-            final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
 
-            for (int i = 0; i < argumentNames.size(); i++)
+                _default = ApplicationRegistry.getInstance().getAuthenticationManager();
+                return;
+            }
+            else
             {
-                final String argName = argumentNames.get(i);
-                final String argValue = argumentValues.get(i);
+                PrincipalDatabase database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(databaseName);
 
-                attributes.put(argName, argValue);
-            }
+                if (database == null)
+                {
+                    throw new ConfigurationException("Requested database:" + databaseName + " was not found");
+                }
 
-            return Collections.unmodifiableMap(attributes);
+                initialiseAuthenticationMechanisms(providerMap, database);
+            }
         }
-    }
-
-    protected PrincipalDatabaseAuthenticationManager()  
-    {
-    }
-
-    public void initialise()
-    {
-        final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
-
-        initialiseAuthenticationMechanisms(providerMap, _principalDatabase);
 
         if (providerMap.size() > 0)
         {
@@ -202,16 +110,33 @@ public class PrincipalDatabaseAuthentica
             {
                 _logger.info("Additional SASL providers successfully registered.");
             }
+
         }
         else
         {
             _logger.warn("No additional SASL providers registered.");
         }
 
-        registerManagement();
     }
 
-    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) 
+
+    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception
+    {
+        if (databases.size() > 1)
+        {
+            _logger.warn("More than one principle database provided currently authentication mechanism will override each other.");
+        }
+
+        for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet())
+        {
+            // fixme As the database now provide the mechanisms they support, they will ...
+            // overwrite each other in the map. There should only be one database per vhost.
+            // But currently we must have authentication before vhost definition.
+            initialiseAuthenticationMechanisms(providerMap, entry.getValue());
+        }
+    }
+
+    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception
     {
         if (database == null || database.getMechanisms().size() == 0)
         {
@@ -227,6 +152,7 @@ public class PrincipalDatabaseAuthentica
 
     private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser,
                                                    Map<String, Class<? extends SaslServerFactory>> providerMap)
+            throws Exception
     {
         if (_mechanisms == null)
         {
@@ -247,217 +173,70 @@ public class PrincipalDatabaseAuthentica
         _logger.info("Initialised " + mechanism + " SASL provider successfully");
     }
 
-    /**
-     * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin)
-     */
-    public void configure(final ConfigurationPlugin config) throws ConfigurationException
-    {
-        final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config;
-        final String pdClazz = pdamConfig.getPrincipalDatabaseClass();
-
-        _logger.info("PrincipalDatabase concrete implementation : " + pdClazz);
-
-        _principalDatabase = createPrincipalDatabaseImpl(pdClazz);
-
-        configPrincipalDatabase(_principalDatabase, pdamConfig);        
-    }
-
     public String getMechanisms()
     {
-        return _mechanisms;
-    }
-
-    public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
-    {
-        return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
-                                     _callbackHandlerMap.get(mechanism));
-    }
-
-    /**
-     * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[])
-     */
-    public AuthenticationResult authenticate(SaslServer server, byte[] response)
-    {
-        try
+        if (_default != null)
         {
-            // Process response from the client
-            byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]);
-
-            if (server.isComplete())
-            {
-                final Subject subject = new Subject();
-                subject.getPrincipals().add(new UsernamePrincipal(server.getAuthorizationID()));
-                return new AuthenticationResult(subject);
-            }
-            else
-            {
-                return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE);
-            }
+            // Use the default AuthenticationManager if present
+            return _default.getMechanisms();
         }
-        catch (SaslException e)
+        else
         {
-            return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
+            return _mechanisms;
         }
     }
 
-    /**
-     * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
-     */
-    public AuthenticationResult authenticate(final String username, final String password)
+    public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
     {
-        try
+        if (_default != null)
         {
-            if (_principalDatabase.verifyPassword(username, password.toCharArray()))
-            {
-                final Subject subject = new Subject();
-                subject.getPrincipals().add(new UsernamePrincipal(username));
-                return new AuthenticationResult(subject);
-            }
-            else
-            {
-                return new AuthenticationResult(AuthenticationStatus.CONTINUE);
-            }
+            // Use the default AuthenticationManager if present
+            return _default.createSaslServer(mechanism, localFQDN);
         }
-        catch (AccountNotFoundException e)
+        else
         {
-            return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+            return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
+                                         _callbackHandlerMap.get(mechanism));
         }
-    }
-
-    public void close()
-    {
-        _mechanisms = null;
-        Security.removeProvider(PROVIDER_NAME);
 
-        unregisterManagement();
     }
 
-    private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
+    public AuthenticationResult authenticate(SaslServer server, byte[] response)
     {
-        try
-        {
-            return (PrincipalDatabase) Class.forName(pdClazz).newInstance();
-        }
-        catch (InstantiationException ie)
-        {
-            throw new ConfigurationException("Cannot instantiate " + pdClazz, ie);
-        }
-        catch (IllegalAccessException iae)
-        {
-            throw new ConfigurationException("Cannot access " + pdClazz, iae);
-        }
-        catch (ClassNotFoundException cnfe)
+        // Use the default AuthenticationManager if present
+        if (_default != null)
         {
-            throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe);
+            return _default.authenticate(server, response);
         }
-        catch (ClassCastException cce)
-        {
-            throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce);
-        }
-    }
 
-    private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config)
-            throws ConfigurationException
-    {
-
-        final Map<String,String> attributes = config.getPdClassAttributeMap();
 
-        for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();)
+        try
         {
-            final Entry<String, String> nameValuePair = iterator.next();
-            final String methodName = generateSetterName(nameValuePair.getKey());
-            final Method method;
-            try
-            {
-                method = principalDatabase.getClass().getMethod(methodName, String.class);
-            }
-            catch (Exception e)
-            {
-                throw new ConfigurationException("No method " + methodName + " found in class "
-                        + principalDatabase.getClass()
-                        + " hence unable to configure principal database. The method must be public and "
-                        + "have a single String argument with a void return type", e);
-            }
-            try
-            {
-                method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue()));
-            }
-            catch (IllegalArgumentException e)
-            {
-                throw new ConfigurationException(e.getMessage(), e);
-            }
-            catch (PropertyException e)
-            {
-                throw new ConfigurationException(e.getMessage(), e);
-            }
-            catch (IllegalAccessException e)
+            // Process response from the client
+            byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]);
+
+            if (server.isComplete())
             {
-                throw new ConfigurationException(e.getMessage(), e);
+                return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS);
             }
-            catch (InvocationTargetException e)
+            else
             {
-                // QPID-1347..  InvocationTargetException wraps the checked exception thrown from the reflective
-                // method call.  Pull out the underlying message and cause to make these more apparent to the user.
-                throw new ConfigurationException(e.getCause().getMessage(), e.getCause());
+                return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE);
             }
         }
-    }
-
-    private String generateSetterName(String argName) throws ConfigurationException
-    {
-        if ((argName == null) || (argName.length() == 0))
-        {
-            throw new ConfigurationException("Argument names must have length >= 1 character");
-        }
-
-        if (Character.isLowerCase(argName.charAt(0)))
+        catch (SaslException e)
         {
-            argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
+            return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
         }
-
-        final String methodName = "set" + argName;
-        return methodName;
-    }
-
-    protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase)
-    {
-        _principalDatabase = principalDatabase;
     }
 
-    protected void registerManagement()
+    public void close()
     {
-        try
-        {
-            _logger.info("Registering UserManagementMBean");
-
-            _mbean = new AMQUserManagementMBean();
-            _mbean.setPrincipalDatabase(_principalDatabase);
-            _mbean.register();
-        }
-        catch (Exception e)
-        {
-            _logger.warn("User management disabled as unable to create MBean:", e);
-            _mbean = null;
-        }
+        Security.removeProvider(PROVIDER_NAME);
     }
 
-    protected void unregisterManagement()
+    public CallbackHandler getHandler(String mechanism)
     {
-        try
-        {
-            if (_mbean != null)
-            {
-                _logger.info("Unregistering UserManagementMBean");
-                _mbean.unregister();
-            }
-        }
-        catch (Exception e)
-        {
-            _logger.warn("Failed to unregister User management MBean:", e);
-        }
-        finally
-        {
-            _mbean = null;
-        }
+        return _callbackHandlerMap.get(mechanism);
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java Sun Aug 14 17:14:51 2011
@@ -45,10 +45,9 @@ public class AmqPlainSaslServerFactory i
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null &&
-            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-             props.containsKey(Sasl.POLICY_NOACTIVE)))
+        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+            props.containsKey(Sasl.POLICY_NOACTIVE)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org