You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 22:08:13 UTC
svn commit: r1157632 [1/2] - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/protocol/
main/java/org/apache/qpid/server/queue/
main/java/org/apache/qpid/server/security/auth/manager/ main/java/org...
Author: rgodfrey
Date: Sun Aug 14 20:08:12 2011
New Revision: 1157632
URL: http://svn.apache.org/viewvc?rev=1157632&view=rev
Log:
Undoing commits of stuff that was meant for my 1-0 sandbox
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Sun Aug 14 20:08:12 2011
@@ -20,18 +20,6 @@
*/
package org.apache.qpid.server;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.logging.*;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -40,28 +28,9 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
-import org.apache.qpid.server.information.management.ServerInformationMBean;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.management.LoggingManagementMBean;
-import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.server.Broker.InitException;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.mina.MINANetworkDriver;
+
/**
* Main entry point for AMQPD.
@@ -69,39 +38,41 @@ import org.apache.qpid.transport.network
*/
public class Main
{
- private static Logger _logger;
-
- private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
-
- public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
- public static final String QPID_HOME = "QPID_HOME";
- private static final int IPV4_ADDRESS_LENGTH = 4;
+ private final Options options = new Options();
+ private CommandLine commandLine;
- private static final char IPV4_LITERAL_SEPARATOR = '.';
- private java.util.logging.Logger FRAME_LOGGER;
- private java.util.logging.Logger RAW_LOGGER;
-
- protected static class InitException extends Exception
+ public static void main(String[] args)
{
- InitException(String msg, Throwable cause)
+ //if the -Dlog4j.configuration property has not been set, enable the init override
+ //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
+ //finds from the classpath when we get the first Loggers
+ if(System.getProperty("log4j.configuration") == null)
{
- super(msg, cause);
+ System.setProperty("log4j.defaultInitOverride", "true");
}
- }
- protected final Options options = new Options();
- protected CommandLine commandLine;
+ new Main(args);
+ }
- protected Main(String[] args)
+ public Main(final String[] args)
{
setOptions(options);
if (parseCommandline(args))
{
- execute();
+ try
+ {
+ execute();
+ }
+ catch(Exception e)
+ {
+ System.err.println("Exception during startup: " + e);
+ e.printStackTrace();
+ shutdown(1);
+ }
}
}
- protected boolean parseCommandline(String[] args)
+ protected boolean parseCommandline(final String[] args)
{
try
{
@@ -119,8 +90,7 @@ public class Main
}
}
- @SuppressWarnings("static-access")
- protected void setOptions(Options options)
+ protected void setOptions(final Options options)
{
Option help = new Option("h", "help", false, "print this message");
Option version = new Option("v", "version", false, "print the version information and exit");
@@ -164,16 +134,21 @@ public class Main
Option bind =
OptionBuilder.withArgName("bind").hasArg()
.withDescription("bind to the specified address. Overrides any value in the config file")
- .withLongOpt("bind").create("b");
+ .withLongOpt("bind").create(BrokerOptions.BIND);
Option logconfig =
OptionBuilder.withArgName("logconfig").hasArg()
.withDescription("use the specified log4j xml configuration file. By "
- + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
- + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+ + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE
+ + " in the same directory as the configuration file").withLongOpt("logconfig").create(BrokerOptions.LOG_CONFIG);
Option logwatchconfig =
OptionBuilder.withArgName("logwatch").hasArg()
.withDescription("monitor the log file configuration file for changes. Units are seconds. "
- + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+ + "Zero means do not check for changes.").withLongOpt("logwatch").create(BrokerOptions.WATCH);
+
+ Option sslport =
+ OptionBuilder.withArgName("sslport").hasArg()
+ .withDescription("SSL port. Overrides any value in the config file")
+ .withLongOpt("sslport").create(BrokerOptions.SSL_PORTS);
options.addOption(help);
options.addOption(version);
@@ -187,472 +162,120 @@ public class Main
options.addOption(exclude0_8);
options.addOption(mport);
options.addOption(bind);
+ options.addOption(sslport);
}
- protected void execute()
+ protected void execute() throws Exception
{
- // note this understands either --help or -h. If an option only has a long name you can use that but if
- // an option has a short name and a long name you must use the short name here.
- if (commandLine.hasOption("h"))
+ BrokerOptions options = new BrokerOptions();
+ String configFile = commandLine.getOptionValue(BrokerOptions.CONFIG);
+ if(configFile != null)
{
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("Qpid", options, true);
+ options.setConfigFile(configFile);
}
- else if (commandLine.hasOption("v"))
- {
- String ver = QpidProperties.getVersionString();
-
- StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
-
- boolean first = true;
- for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
- {
- if (first)
- {
- first = false;
- }
- else
- {
- protocol.append(", ");
- }
-
- protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
-
- }
- System.out.println(ver + " (" + protocol + ")");
- }
- else
+ String logWatchConfig = commandLine.getOptionValue(BrokerOptions.WATCH);
+ if(logWatchConfig != null)
{
- try
- {
- CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
- startup();
- CurrentActor.remove();
- }
- catch (InitException e)
- {
- System.out.println("Initialisation Error : " + e.getMessage());
- shutdown(1);
- }
- catch (Throwable e)
- {
- System.out.println("Error initialising message broker: " + e);
- e.printStackTrace();
- shutdown(1);
- }
+ options.setLogWatchFrequency(Integer.parseInt(logWatchConfig));
}
- }
-
- protected void shutdown(int status)
- {
- ApplicationRegistry.removeAll();
- System.exit(status);
- }
-
- protected void startup() throws Exception
- {
-
- FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log");
- RAW_LOGGER = updateLogger("RAW", "qpid-raw.log");
-
- final String QpidHome = System.getProperty(QPID_HOME);
- final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
- final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
- if (!configFile.exists())
- {
- String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
- if (QpidHome == null)
- {
- error = error + "\nNote: " + QPID_HOME + " is not set.";
- }
-
- throw new InitException(error, null);
- }
- else
+ String logConfig = commandLine.getOptionValue(BrokerOptions.LOG_CONFIG);
+ if(logConfig != null)
{
- CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
+ options.setLogConfigFile(logConfig);
}
- String logConfig = commandLine.getOptionValue("l");
- String logWatchConfig = commandLine.getOptionValue("w", "0");
-
- int logWatchTime = 0;
- try
- {
- logWatchTime = Integer.parseInt(logWatchConfig);
- }
- catch (NumberFormatException e)
+ String jmxPort = commandLine.getOptionValue(BrokerOptions.MANAGEMENT);
+ if(jmxPort != null)
{
- System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
- + "a non-negative integer. Using default of zero (no watching configured");
+ options.setJmxPort(Integer.parseInt(jmxPort));
}
- File logConfigFile;
- if (logConfig != null)
- {
- logConfigFile = new File(logConfig);
- configureLogging(logConfigFile, logWatchTime);
- }
- else
+ String bindAddr = commandLine.getOptionValue(BrokerOptions.BIND);
+ if (bindAddr != null)
{
- File configFileDirectory = configFile.getParentFile();
- logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
- configureLogging(logConfigFile, logWatchTime);
+ options.setBind(bindAddr);
}
- ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
- ServerConfiguration serverConfig = config.getConfiguration();
- updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
-
- ApplicationRegistry.initialise(config);
-
- // We have already loaded the BrokerMessages class by this point so we
- // need to refresh the locale setting incase we had a different value in
- // the configuration.
- BrokerMessages.reload();
-
- // AR.initialise() sets and removes its own actor so we now need to set the actor
- // for the remainder of the startup, and the default actor if the stack is empty
- CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
- CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
- GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
-
-
- try
+ String[] portStr = commandLine.getOptionValues(BrokerOptions.PORTS);
+ if(portStr != null)
{
- configureLoggingManagementMBean(logConfigFile, logWatchTime);
-
- ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
- configMBean.register();
-
- ServerInformationMBean sysInfoMBean =
- new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
- sysInfoMBean.register();
-
-
- String[] portStr = commandLine.getOptionValues("p");
-
- Set<Integer> ports = new HashSet<Integer>();
- Set<Integer> exclude_0_10 = new HashSet<Integer>();
- Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
- Set<Integer> exclude_0_9 = new HashSet<Integer>();
- Set<Integer> exclude_0_8 = new HashSet<Integer>();
-
- if(portStr == null || portStr.length == 0)
- {
-
- parsePortList(ports, serverConfig.getPorts());
- parsePortList(exclude_0_10, serverConfig.getPortExclude010());
- parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
- parsePortList(exclude_0_9, serverConfig.getPortExclude09());
- parsePortList(exclude_0_8, serverConfig.getPortExclude08());
-
- }
- else
- {
- parsePortArray(ports, portStr);
- parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
- parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1"));
- parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
- parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
-
- }
-
-
-
-
- String bindAddr = commandLine.getOptionValue("b");
- if (bindAddr == null)
- {
- bindAddr = serverConfig.getBind();
- }
- InetAddress bindAddress = null;
-
-
-
- if (bindAddr.equals("wildcard"))
+ parsePortArray(options, portStr, false);
+ for(ProtocolExclusion pe : ProtocolExclusion.values())
{
- bindAddress = new InetSocketAddress(0).getAddress();
+ parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
}
- else
- {
- bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
- }
-
- String hostName = bindAddress.getCanonicalHostName();
-
-
- String keystorePath = serverConfig.getKeystorePath();
- String keystorePassword = serverConfig.getKeystorePassword();
- String certType = serverConfig.getCertType();
- SSLContextFactory sslFactory = null;
-
- if (!serverConfig.getSSLOnly())
- {
-
- for(int port : ports)
- {
-
- NetworkDriver driver = new MINANetworkDriver();
-
- Set<VERSION> supported = EnumSet.allOf(VERSION.class);
-
- if(exclude_0_10.contains(port))
- {
- supported.remove(VERSION.v0_10);
- }
-
- if(exclude_0_9_1.contains(port))
- {
- supported.remove(VERSION.v0_9_1);
- }
- if(exclude_0_9.contains(port))
- {
- supported.remove(VERSION.v0_9);
- }
- if(exclude_0_8.contains(port))
- {
- supported.remove(VERSION.v0_8);
- }
-
- MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(hostName, supported);
-
-
-
- driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
- serverConfig.getNetworkConfiguration(), null);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
- new QpidAcceptor(driver,"TCP"));
- CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
-
- }
-
- }
-
- if (serverConfig.getEnableSSL())
- {
- sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
- NetworkDriver driver = new MINANetworkDriver();
- driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
- new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()),
- new QpidAcceptor(driver,"TCP"));
- CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort()));
- }
-
- CurrentActor.get().message(BrokerMessages.READY());
-
}
- finally
- {
- // Startup is complete so remove the AR initialised Startup actor
- CurrentActor.remove();
- }
-
-
- }
-
- private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException
- {
- java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType);
- logger.setLevel(Level.FINE);
- Formatter formatter = new Formatter()
+ String[] sslPortStr = commandLine.getOptionValues(BrokerOptions.SSL_PORTS);
+ if(sslPortStr != null)
{
- @Override
- public String format(final LogRecord record)
+ parsePortArray(options, sslPortStr, true);
+ for(ProtocolExclusion pe : ProtocolExclusion.values())
{
-
- return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n";
+ parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
}
- };
- for(Handler handler : logger.getHandlers())
- {
- logger.removeHandler(handler);
}
- Handler handler = new ConsoleHandler();
-
- handler.setLevel(Level.FINE);
- handler.setFormatter(formatter);
-
- logger.addHandler(handler);
-
-
- handler = new FileHandler(logFileName, true);
- handler.setLevel(Level.FINE);
- handler.setFormatter(formatter);
+
+ startBroker(options);
+ }
- logger.addHandler(handler);
- return logger;
+ protected void startBroker(final BrokerOptions options) throws Exception
+ {
+ Broker broker = new Broker();
+ broker.startup(options);
}
- private void parsePortArray(Set<Integer> ports, String[] portStr)
- throws InitException
+ protected void shutdown(final int status)
{
- if(portStr != null)
- {
- for(int i = 0; i < portStr.length; i++)
- {
- try
- {
- ports.add(Integer.parseInt(portStr[i]));
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid port: " + portStr[i], e);
- }
- }
- }
+ ApplicationRegistry.remove();
+ System.exit(status);
}
- private void parsePortList(Set<Integer> output, List input)
- throws InitException
+ private static void parsePortArray(final BrokerOptions options,final Object[] ports,
+ final boolean ssl) throws InitException
{
- if(input != null)
+ if(ports != null)
{
- for(Object port : input)
+ for(int i = 0; i < ports.length; i++)
{
try
{
- output.add(Integer.parseInt(String.valueOf(port)));
+ if(ssl)
+ {
+ options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
+ }
+ else
+ {
+ options.addPort(Integer.parseInt(String.valueOf(ports[i])));
+ }
}
catch (NumberFormatException e)
{
- throw new InitException("Invalid port: " + port, e);
+ throw new InitException("Invalid port: " + ports[i], e);
}
}
}
}
- /**
- * Update the configuration data with the management port.
- * @param configuration
- * @param managementPort The string from the command line
- */
- private void updateManagementPort(ServerConfiguration configuration, String managementPort)
+ private static void parsePortArray(final BrokerOptions options, final Object[] ports,
+ final ProtocolExclusion excludedProtocol) throws InitException
{
- if (managementPort != null)
+ if(ports != null)
{
- try
- {
- configuration.setJMXManagementPort(Integer.parseInt(managementPort));
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
- }
- }
- }
-
- public static void main(String[] args)
- {
- //if the -Dlog4j.configuration property has not been set, enable the init override
- //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
- //finds from the classpath when we get the first Loggers
- if(System.getProperty("log4j.configuration") == null)
- {
- System.setProperty("log4j.defaultInitOverride", "true");
- }
-
- //now that the override status is know, we can instantiate the Loggers
- _logger = Logger.getLogger(Main.class);
-
- new Main(args);
- }
-
- private byte[] parseIP(String address) throws Exception
- {
- char[] literalBuffer = address.toCharArray();
- int byteCount = 0;
- int currByte = 0;
- byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
- for (int i = 0; i < literalBuffer.length; i++)
- {
- char currChar = literalBuffer[i];
- if ((currChar >= '0') && (currChar <= '9'))
- {
- currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF);
- }
-
- if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length))
- {
- ip[byteCount++] = (byte) currByte;
- currByte = 0;
- }
- }
-
- if (byteCount != 4)
- {
- throw new Exception("Invalid IP address: " + address);
- }
- return ip;
- }
-
- private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
- {
- if (logConfigFile.exists() && logConfigFile.canRead())
- {
- CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
-
- if (logWatchTime > 0)
- {
- System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
- + logWatchTime + " seconds");
- // log4j expects the watch interval in milliseconds
- try
- {
- QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
- }
- catch (Exception e)
- {
- throw new InitException(e.getMessage(),e);
- }
- }
- else
+ for(int i = 0; i < ports.length; i++)
{
try
{
- QpidLog4JConfigurator.configure(logConfigFile.getPath());
+ options.addExcludedPort(excludedProtocol,
+ Integer.parseInt(String.valueOf(ports[i])));
}
- catch (Exception e)
- {
- throw new InitException(e.getMessage(),e);
- }
- }
- }
- else
- {
- System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
- System.err.println("Using the fallback internal log4j.properties configuration");
-
- InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
- if(propsFile == null)
- {
- throw new IOException("Unable to load the fallback internal log4j.properties configuration file");
- }
- else
- {
- try
- {
- Properties fallbackProps = new Properties();
- fallbackProps.load(propsFile);
- PropertyConfigurator.configure(fallbackProps);
- }
- finally
+ catch (NumberFormatException e)
{
- propsFile.close();
+ throw new InitException("Invalid port for exclusion: " + ports[i], e);
}
}
}
}
-
- private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
- {
- LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
-
- blm.register();
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Sun Aug 14 20:08:12 2011
@@ -20,56 +20,44 @@
*/
package org.apache.qpid.server.protocol;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-
-import java.util.Set;
-import java.util.Arrays;
-import java.util.HashSet;
+import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
- ;
-
-
- public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 };
-
- private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
+ private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
private final IApplicationRegistry _appRegistry;
private final String _fqdn;
- private final Set<VERSION> _supported;
-
+ private final Set<AmqpProtocolVersion> _supported;
public MultiVersionProtocolEngineFactory()
{
- this(1, "localhost", ALL_VERSIONS);
- }
-
- public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
- {
- this(1, fqdn, versions);
+ this("localhost", ALL_VERSIONS);
}
-
public MultiVersionProtocolEngineFactory(String fqdn)
{
- this(1, fqdn, ALL_VERSIONS);
+ this(fqdn, ALL_VERSIONS);
}
- public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
+ public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
{
- _appRegistry = ApplicationRegistry.getInstance(instance);
+ _appRegistry = ApplicationRegistry.getInstance();
_fqdn = fqdn;
_supported = supportedVersions;
}
-
- public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Aug 14 20:08:12 2011
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements A
/** null means shared */
private final AMQShortString _owner;
- private PrincipalHolder _prinicpalHolder;
+ private AuthorizationHolder _authorizationHolder;
private boolean _exclusive = false;
private AMQSessionModel _exclusiveOwner;
@@ -102,9 +102,7 @@ public class SimpleAMQQueue implements A
protected final QueueEntryList _entries;
- protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
-
- private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+ protected final SubscriptionList _subscriptionList = new SubscriptionList();
private volatile Subscription _exclusiveSubscriber;
@@ -188,7 +186,7 @@ public class SimpleAMQQueue implements A
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
private ConfigurationPlugin _queueConfiguration;
- private final boolean _isTopic;
+
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
@@ -234,12 +232,10 @@ public class SimpleAMQQueue implements A
_exclusive = exclusive;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _arguments = arguments == null ? Collections.EMPTY_MAP : arguments;
+ _arguments = arguments;
_id = virtualHost.getConfigStore().createId();
- _isTopic = arguments != null && arguments.containsKey("topic");
-
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
@@ -331,7 +327,7 @@ public class SimpleAMQQueue implements A
{
return _exclusive;
}
-
+
public void setExclusive(boolean exclusive) throws AMQException
{
_exclusive = exclusive;
@@ -375,14 +371,14 @@ public class SimpleAMQQueue implements A
return _owner;
}
- public PrincipalHolder getPrincipalHolder()
+ public AuthorizationHolder getAuthorizationHolder()
{
- return _prinicpalHolder;
+ return _authorizationHolder;
}
- public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
+ public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
{
- _prinicpalHolder = prinicpalHolder;
+ _authorizationHolder = authorizationHolder;
}
@@ -406,8 +402,8 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied");
}
-
-
+
+
if (hasExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
@@ -437,14 +433,14 @@ public class SimpleAMQQueue implements A
subscription.setNoLocal(_nolocal);
}
_subscriptionList.add(subscription);
-
+
//Increment consumerCountHigh if necessary. (un)registerSubscription are both
//synchronized methods so we don't need additional synchronization here
if(_counsumerCountHigh.get() < getConsumerCount())
{
_counsumerCountHigh.incrementAndGet();
}
-
+
if (isDeleted())
{
subscription.queueDeleted(this);
@@ -490,11 +486,6 @@ public class SimpleAMQQueue implements A
// queue. This is because the delete method uses the subscription set which has just been cleared
subscription.queueDeleted(this);
}
-
- if(_subscriptionList.size() == 0 && _isTopic)
- {
- clearQueue();
- }
}
}
@@ -521,10 +512,10 @@ public class SimpleAMQQueue implements A
break;
}
}
-
+
reconfigure();
}
-
+
private void reconfigure()
{
//Reconfigure the queue for to reflect this new binding.
@@ -550,7 +541,7 @@ public class SimpleAMQQueue implements A
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
-
+
reconfigure();
}
@@ -577,104 +568,101 @@ public class SimpleAMQQueue implements A
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
+ incrementTxnEnqueueStats(message);
+ incrementQueueCount();
+ incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
+ QueueEntry entry;
Subscription exclusiveSub = _exclusiveSubscriber;
- if(!_isTopic || _subscriptionList.size()!=0)
- {
- incrementTxnEnqueueStats(message);
- incrementQueueCount();
- incrementQueueSize(message);
- QueueEntry entry;
+ if (exclusiveSub != null)
+ {
+ exclusiveSub.getSendLock();
- if (exclusiveSub != null)
+ try
{
- exclusiveSub.getSendLock();
-
- try
- {
- entry = _entries.add(message);
+ entry = _entries.add(message);
- deliverToSubscription(exclusiveSub, entry);
- }
- finally
- {
- exclusiveSub.releaseSendLock();
- }
+ deliverToSubscription(exclusiveSub, entry);
}
- else
+ finally
{
- entry = _entries.add(message);
- /*
+ exclusiveSub.releaseSendLock();
+ }
+ }
+ else
+ {
+ entry = _entries.add(message);
+ /*
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- */
- SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
- SubscriptionList.SubscriptionNode nextNode = node.getNext();
- if (nextNode == null)
+ */
+ SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
+ SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ if (nextNode == null)
+ {
+ nextNode = _subscriptionList.getHead().findNext();
+ }
+ while (nextNode != null)
+ {
+ if (_subscriptionList.updateMarkedNode(node, nextNode))
{
- nextNode = _subscriptionList.getHead().getNext();
+ break;
}
- while (nextNode != null)
+ else
{
- if (_lastSubscriptionNode.compareAndSet(node, nextNode))
- {
- break;
- }
- else
+ node = _subscriptionList.getMarkedNode();
+ nextNode = node.findNext();
+ if (nextNode == null)
{
- node = _lastSubscriptionNode.get();
- nextNode = node.getNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
+ nextNode = _subscriptionList.getHead().findNext();
}
}
+ }
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
- while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+ while (entry.isAvailable() && loops != 0)
+ {
+ if (nextNode == null)
{
- if (nextNode == null)
- {
- loops--;
- nextNode = _subscriptionList.getHead();
- }
- else
- {
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
- }
- nextNode = nextNode.getNext();
-
+ loops--;
+ nextNode = _subscriptionList.getHead();
+ }
+ else
+ {
+ // if subscription at end, and active, offer
+ Subscription sub = nextNode.getSubscription();
+ deliverToSubscription(sub, entry);
}
+ nextNode = nextNode.findNext();
+
}
+ }
- if (!(entry.isAcquired() || entry.isDeleted()))
- {
- checkSubscriptionsNotAheadOfDelivery(entry);
+ if (entry.isAvailable())
+ {
+ checkSubscriptionsNotAheadOfDelivery(entry);
- deliverAsync();
- }
+ deliverAsync();
+ }
- if(_managedObject != null)
- {
- _managedObject.checkForNotification(entry.getMessage());
- }
+ if(_managedObject != null)
+ {
+ _managedObject.checkForNotification(entry.getMessage());
+ }
- if(action != null)
- {
- action.onEnqueue(entry);
- }
+ if(action != null)
+ {
+ action.onEnqueue(entry);
}
+
}
private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -730,20 +718,20 @@ public class SimpleAMQQueue implements A
{
getAtomicQueueCount().incrementAndGet();
}
-
+
private void incrementTxnEnqueueStats(final ServerMessage message)
{
SessionConfig session = message.getSessionConfig();
-
+
if(session !=null && session.isTransactional())
{
_msgTxnEnqueues.incrementAndGet();
_byteTxnEnqueues.addAndGet(message.getSize());
}
}
-
+
private void incrementTxnDequeueStats(QueueEntry entry)
- {
+ {
_msgTxnDequeues.incrementAndGet();
_byteTxnDequeues.addAndGet(entry.getSize());
}
@@ -757,40 +745,6 @@ public class SimpleAMQQueue implements A
incrementUnackedMsgCount();
sub.send(entry);
-
- if(_isTopic)
- {
- if(allSubscriptionsAhead(entry) && entry.acquire())
- {
- entry.discard();
- }
- }
- }
-
- private boolean allSubscriptionsAhead(final QueueEntry entry)
- {
- SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && !entry.isAcquired())
- {
- final Subscription subscription = subIter.getNode().getSubscription();
- if(!subscription.isClosed())
- {
- QueueContext context = (QueueContext) subscription.getQueueContext();
- if(context != null)
- {
- QueueEntry subnode = context._lastSeenEntry;
- if(subnode.compareTo(entry)<0)
- {
- return false;
- }
- }
- else
- {
- return false;
- }
- }
- }
- return true;
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
@@ -849,24 +803,6 @@ public class SimpleAMQQueue implements A
}
- public void requeue(QueueEntryImpl entry, Subscription subscription)
- {
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- Subscription sub = subscriberIter.getNode().getSubscription();
-
- // we don't make browsers send the same stuff twice
- if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
- {
- updateSubRequeueEntry(sub, entry);
- }
- }
-
- deliverAsync();
- }
-
public void dequeue(QueueEntry entry, Subscription sub)
{
decrementQueueCount();
@@ -875,7 +811,7 @@ public class SimpleAMQQueue implements A
{
_deliveredMessages.decrementAndGet();
}
-
+
if(sub != null && sub.isSessionTransactional())
{
incrementTxnDequeueStats(entry);
@@ -932,7 +868,7 @@ public class SimpleAMQQueue implements A
{
return _subscriptionList.size();
}
-
+
public int getConsumerCountHigh()
{
return _counsumerCountHigh.get();
@@ -1004,7 +940,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDeleted())
+ if (node != null && !node.isDispensed())
{
entryList.add(node);
}
@@ -1108,7 +1044,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && filter.accept(node))
+ if (!node.isDispensed() && filter.accept(node))
{
entryList.add(node);
}
@@ -1302,7 +1238,6 @@ public class SimpleAMQQueue implements A
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
- && !node.isDeleted()
&& node.acquire())
{
dequeueEntry(node);
@@ -1332,7 +1267,7 @@ public class SimpleAMQQueue implements A
while (noDeletes && queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
+ if (node.acquire())
{
dequeueEntry(node);
noDeletes = false;
@@ -1342,7 +1277,7 @@ public class SimpleAMQQueue implements A
}
public long clearQueue() throws AMQException
- {
+ {
return clear(0l);
}
@@ -1353,7 +1288,7 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied: queue " + getName());
}
-
+
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
@@ -1362,7 +1297,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
+ if (node.acquire())
{
dequeueEntry(node, txn);
if(++count == request)
@@ -1420,7 +1355,7 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied: " + getName());
}
-
+
if (!_deleted.getAndSet(true))
{
@@ -1629,7 +1564,7 @@ public class SimpleAMQQueue implements A
public void deliverAsync()
{
- Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+ QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1648,52 +1583,6 @@ public class SimpleAMQQueue implements A
_asyncDelivery.execute(flusher);
}
-
- private class Runner implements ReadWriteRunnable
- {
- String _name;
- public Runner(long count)
- {
- _name = "QueueRunner-" + count + "-" + _logActor;
- }
-
- public void run()
- {
- String originalName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(_name);
- CurrentActor.set(_logActor);
-
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
- }
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
- public String toString()
- {
- return _name;
- }
- }
-
public void flushSubscription(Subscription sub) throws AMQException
{
// Access control
@@ -1714,9 +1603,12 @@ public class SimpleAMQQueue implements A
{
sub.getSendLock();
atTail = attemptDelivery(sub);
- if (atTail && getNextAvailableEntry(sub) == null)
+ if (atTail && sub.isAutoClose())
{
- sub.queueEmpty();
+ unregisterSubscription(sub);
+
+ sub.confirmAutoClose();
+
}
else if (!atTail)
{
@@ -1737,7 +1629,6 @@ public class SimpleAMQQueue implements A
{
advanceAllSubscriptions();
}
-
return atTail;
}
@@ -1760,7 +1651,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = getNextAvailableEntry(sub);
- if (node != null && !(node.isAcquired() || node.isDeleted()))
+ if (node != null && node.isAvailable())
{
if (sub.hasInterest(node))
{
@@ -1821,7 +1712,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
{
if (expired)
{
@@ -1850,14 +1741,40 @@ public class SimpleAMQQueue implements A
}
- private void processQueue(Runnable runner) throws AMQException
+ /**
+ * Used by queue Runners to asynchronously deliver messages to consumers.
+ *
+ * A queue Runner is started whenever a state change occurs, e.g when a new
+ * message arrives on the queue and cannot be immediately delivered to a
+ * subscription (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * capable of accepting/delivering all messages then these messages would
+ * otherwise remain on the queue.
+ *
+ * processQueue should be running while there are messages on the queue AND
+ * there are subscriptions that can deliver them. If there are no
+ * subscriptions capable of delivering the remaining messages on the queue
+ * then processQueue should stop to prevent spinning.
+ *
+ * Since processQueue is runs in a fixed size Executor, it should not run
+ * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+ * incoming messages may not be able to be scheduled in the thread pool
+ * because all threads are working on clearing down large queues). To solve
+ * this problem, after an arbitrary number of message deliveries the
+ * processQueue job stops iterating, resubmits itself to the executor, and
+ * ends the current instance
+ *
+ * @param runner the Runner to schedule
+ * @throws AMQException
+ */
+ public void processQueue(QueueRunner runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
- int extraLoops = 1;
- long iterations = MAX_ASYNC_DELIVERIES;
+ boolean lastLoop = false;
+ int iterations = MAX_ASYNC_DELIVERIES;
_asynchronousRunner.compareAndSet(runner, null);
@@ -1874,12 +1791,14 @@ public class SimpleAMQQueue implements A
if (previousStateChangeCount != stateChangeCount)
{
- extraLoops = 1;
+ //further asynchronous delivery is required since the
+ //previous loop. keep going if iteration slicing allows.
+ lastLoop = false;
}
previousStateChangeCount = stateChangeCount;
- deliveryIncomplete = _subscriptionList.size() != 0;
- boolean done;
+ boolean allSubscriptionsDone = true;
+ boolean subscriptionDone;
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
//iterate over the subscribers and try to advance their pointer
@@ -1889,29 +1808,25 @@ public class SimpleAMQQueue implements A
sub.getSendLock();
try
{
-
- done = attemptDelivery(sub);
-
- if (done)
+ //attempt delivery. returns true if no further delivery currently possible to this sub
+ subscriptionDone = attemptDelivery(sub);
+ if (subscriptionDone)
{
- if (extraLoops == 0)
+ //close autoClose subscriptions if we are not currently intent on continuing
+ if (lastLoop && sub.isAutoClose())
{
- if(getNextAvailableEntry(sub) == null)
- {
- sub.queueEmpty();
- }
- deliveryIncomplete = false;
+ unregisterSubscription(sub);
- }
- else
- {
- extraLoops--;
+ sub.confirmAutoClose();
}
}
else
{
+ //this subscription can accept additional deliveries, so we must
+ //keep going after this (if iteration slicing allows it)
+ allSubscriptionsDone = false;
+ lastLoop = false;
iterations--;
- extraLoops = 1;
}
}
finally
@@ -1919,10 +1834,34 @@ public class SimpleAMQQueue implements A
sub.releaseSendLock();
}
}
+
+ if(allSubscriptionsDone && lastLoop)
+ {
+ //We have done an extra loop already and there are again
+ //again no further delivery attempts possible, only
+ //keep going if state change demands it.
+ deliveryIncomplete = false;
+ }
+ else if(allSubscriptionsDone)
+ {
+ //All subscriptions reported being done, but we have to do
+ //an extra loop if the iterations are not exhausted and
+ //there is still any work to be done
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ lastLoop = true;
+ }
+ else
+ {
+ //some subscriptions can still accept more messages,
+ //keep going if iteration count allows.
+ lastLoop = false;
+ deliveryIncomplete = true;
+ }
+
_asynchronousRunner.set(null);
}
- // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+ // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
@@ -1942,8 +1881,8 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- // Only process nodes that are not currently deleted
- if (!node.isDeleted())
+ // Only process nodes that are not currently deleted and not dequeued
+ if (!node.isDispensed())
{
// If the node has exired then aquire it
if (node.expired() && node.acquire())
@@ -2207,22 +2146,22 @@ public class SimpleAMQQueue implements A
{
return _dequeueSize.get();
}
-
+
public long getByteTxnEnqueues()
{
return _byteTxnEnqueues.get();
}
-
+
public long getByteTxnDequeues()
{
return _byteTxnDequeues.get();
}
-
+
public long getMsgTxnEnqueues()
{
return _msgTxnEnqueues.get();
}
-
+
public long getMsgTxnDequeues()
{
return _msgTxnDequeues.get();
@@ -2259,21 +2198,21 @@ public class SimpleAMQQueue implements A
{
return _unackedMsgCountHigh.get();
}
-
+
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
}
-
+
public void decrementUnackedMsgCount()
{
_unackedMsgCount.decrementAndGet();
}
-
+
private void incrementUnackedMsgCount()
{
long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-
+
long unackedMsgCountHigh;
while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
{
@@ -2283,4 +2222,9 @@ public class SimpleAMQQueue implements A
}
}
}
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Sun Aug 14 20:08:12 2011
@@ -20,20 +20,73 @@
*/
package org.apache.qpid.server.security.auth.manager;
+import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource;
import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.security.auth.AuthenticationResult;
-public interface AuthenticationManager extends Closeable, CallbackHanderSource
+/**
+ * Implementations of the AuthenticationManager are responsible for determining
+ * the authenticity of a user's credentials.
+ *
+ * If the authentication is successful, the manager is responsible for producing a populated
+ * {@link Subject} containing the user's identity and zero or more principals representing
+ * groups to which the user belongs.
+ * <p>
+ * The {@link #initialise()} method is responsible for registering SASL mechanisms required by
+ * the manager. The {@link #close()} method must reverse this registration.
+ *
+ */
+public interface AuthenticationManager extends Closeable, Plugin
{
+ /** The name for the required SASL Server mechanisms */
+ public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
+
+ /**
+ * Initialise the authentication plugin.
+ *
+ */
+ void initialise();
+
+ /**
+ * Gets the SASL mechanisms known to this manager.
+ *
+ * @return SASL mechanism names, space separated.
+ */
String getMechanisms();
+ /**
+ * Creates a SASL server for the specified mechanism name for the given
+ * fully qualified domain name.
+ *
+ * @param mechanism mechanism name
+ * @param localFQDN domain name
+ *
+ * @return SASL server
+ * @throws SaslException
+ */
SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
+ /**
+ * Authenticates a user using SASL negotiation.
+ *
+ * @param server SASL server
+ * @param response SASL response to process
+ *
+ * @return authentication result
+ */
AuthenticationResult authenticate(SaslServer server, byte[] response);
-
+ /**
+ * Authenticates a user using their username and password.
+ *
+ * @param username username
+ * @param password password
+ *
+ * @return authentication result
+ */
+ AuthenticationResult authenticate(String username, String password);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Sun Aug 14 20:08:12 2011
@@ -20,27 +20,65 @@
*/
package org.apache.qpid.server.security.auth.manager;
-import org.apache.log4j.Logger;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Security;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.login.AccountNotFoundException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.log4j.Logger;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.JCAProvider;
+import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.sasl.JCAProvider;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.SaslServerFactory;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.Sasl;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TreeMap;
-import java.security.Security;
+/**
+ * Concrete implementation of the AuthenticationManager that determines if supplied
+ * user credentials match those appearing in a PrincipalDatabase. The implementation
+ * of the PrincipalDatabase is determined from the configuration.
+ *
+ * This implementation also registers the JMX UserManagemement MBean.
+ *
+ * This plugin expects configuration such as:
+ *
+ * <pre>
+ * <pd-auth-manager>
+ * <principal-database>
+ * <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
+ * <attributes>
+ * <attribute>
+ * <name>passwordFile</name>
+ * <value>${conf}/passwd</value>
+ * </attribute>
+ * </attributes>
+ * </principal-database>
+ * </pd-auth-manager>
+ * </pre>
+ */
public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager
{
private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class);
@@ -49,55 +87,109 @@ public class PrincipalDatabaseAuthentica
private String _mechanisms;
/** Maps from the mechanism to the callback handler to use for handling those requests */
- private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+ private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
/**
* Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for
* details of the use of these properties. This map is populated during initialisation of each provider.
*/
- private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
+ private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
- private AuthenticationManager _default = null;
- /** The name for the required SASL Server mechanisms */
- public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
+ protected PrincipalDatabase _principalDatabase = null;
- public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception
- {
- _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'")
- + " PrincipalDatabase authentication manager.");
+ protected AMQUserManagementMBean _mbean = null;
- // Fixme This should be done per Vhost but allowing global hack isn't right but ...
- // required as authentication is done before Vhost selection
+ public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
+ {
+ public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
+ {
+ final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
- Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
+ // If there is no configuration for this plugin then don't load it.
+ if (configuration == null)
+ {
+ _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager");
+ return null;
+ }
+ final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager();
+ pdam.configure(configuration);
+ pdam.initialise();
+ return pdam;
+ }
- if (name == null || hostConfig == null)
+ public Class<PrincipalDatabaseAuthenticationManager> getPluginClass()
{
- initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
+ return PrincipalDatabaseAuthenticationManager.class;
}
- else
+
+ public String getPluginName()
{
- String databaseName = hostConfig.getAuthenticationDatabase();
+ return PrincipalDatabaseAuthenticationManager.class.getName();
+ }
+ };
- if (databaseName == null)
+ public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
+
+ public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
+ {
+ public List<String> getParentPaths()
{
-
- _default = ApplicationRegistry.getInstance().getAuthenticationManager();
- return;
+ return Arrays.asList("security.pd-auth-manager");
}
- else
+
+ public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
{
- PrincipalDatabase database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(databaseName);
+ final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
+
+ instance.setConfiguration(path, config);
+ return instance;
+ }
+ };
+
+ public String[] getElementsProcessed()
+ {
+ return new String[] {"principal-database.class",
+ "principal-database.attributes.attribute.name",
+ "principal-database.attributes.attribute.value"};
+ }
- if (database == null)
- {
- throw new ConfigurationException("Requested database:" + databaseName + " was not found");
- }
+ public void validateConfiguration() throws ConfigurationException
+ {
+ }
+
+ public String getPrincipalDatabaseClass()
+ {
+ return _configuration.getString("principal-database.class");
+ }
+
+ public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
+ {
+ final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name");
+ final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value");
+ final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
- initialiseAuthenticationMechanisms(providerMap, database);
+ for (int i = 0; i < argumentNames.size(); i++)
+ {
+ final String argName = argumentNames.get(i);
+ final String argValue = argumentValues.get(i);
+
+ attributes.put(argName, argValue);
}
+
+ return Collections.unmodifiableMap(attributes);
}
+ }
+
+ protected PrincipalDatabaseAuthenticationManager()
+ {
+ }
+
+ public void initialise()
+ {
+ final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
+
+ initialiseAuthenticationMechanisms(providerMap, _principalDatabase);
if (providerMap.size() > 0)
{
@@ -110,33 +202,16 @@ public class PrincipalDatabaseAuthentica
{
_logger.info("Additional SASL providers successfully registered.");
}
-
}
else
{
_logger.warn("No additional SASL providers registered.");
}
+ registerManagement();
}
-
- private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception
- {
- if (databases.size() > 1)
- {
- _logger.warn("More than one principle database provided currently authentication mechanism will override each other.");
- }
-
- for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet())
- {
- // fixme As the database now provide the mechanisms they support, they will ...
- // overwrite each other in the map. There should only be one database per vhost.
- // But currently we must have authentication before vhost definition.
- initialiseAuthenticationMechanisms(providerMap, entry.getValue());
- }
- }
-
- private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception
+ private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database)
{
if (database == null || database.getMechanisms().size() == 0)
{
@@ -152,7 +227,6 @@ public class PrincipalDatabaseAuthentica
private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser,
Map<String, Class<? extends SaslServerFactory>> providerMap)
- throws Exception
{
if (_mechanisms == null)
{
@@ -173,43 +247,37 @@ public class PrincipalDatabaseAuthentica
_logger.info("Initialised " + mechanism + " SASL provider successfully");
}
+ /**
+ * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin)
+ */
+ public void configure(final ConfigurationPlugin config) throws ConfigurationException
+ {
+ final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config;
+ final String pdClazz = pdamConfig.getPrincipalDatabaseClass();
+
+ _logger.info("PrincipalDatabase concrete implementation : " + pdClazz);
+
+ _principalDatabase = createPrincipalDatabaseImpl(pdClazz);
+
+ configPrincipalDatabase(_principalDatabase, pdamConfig);
+ }
+
public String getMechanisms()
{
- if (_default != null)
- {
- // Use the default AuthenticationManager if present
- return _default.getMechanisms();
- }
- else
- {
- return _mechanisms;
- }
+ return _mechanisms;
}
public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
{
- if (_default != null)
- {
- // Use the default AuthenticationManager if present
- return _default.createSaslServer(mechanism, localFQDN);
- }
- else
- {
- return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
- _callbackHandlerMap.get(mechanism));
- }
-
+ return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
+ _callbackHandlerMap.get(mechanism));
}
+ /**
+ * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[])
+ */
public AuthenticationResult authenticate(SaslServer server, byte[] response)
{
- // Use the default AuthenticationManager if present
- if (_default != null)
- {
- return _default.authenticate(server, response);
- }
-
-
try
{
// Process response from the client
@@ -217,7 +285,9 @@ public class PrincipalDatabaseAuthentica
if (server.isComplete())
{
- return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS);
+ final Subject subject = new Subject();
+ subject.getPrincipals().add(new UsernamePrincipal(server.getAuthorizationID()));
+ return new AuthenticationResult(subject);
}
else
{
@@ -230,13 +300,164 @@ public class PrincipalDatabaseAuthentica
}
}
+ /**
+ * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
+ */
+ public AuthenticationResult authenticate(final String username, final String password)
+ {
+ try
+ {
+ if (_principalDatabase.verifyPassword(username, password.toCharArray()))
+ {
+ final Subject subject = new Subject();
+ subject.getPrincipals().add(new UsernamePrincipal(username));
+ return new AuthenticationResult(subject);
+ }
+ else
+ {
+ return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+ }
+ }
+ catch (AccountNotFoundException e)
+ {
+ return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+ }
+ }
+
public void close()
{
+ _mechanisms = null;
Security.removeProvider(PROVIDER_NAME);
+
+ unregisterManagement();
}
- public CallbackHandler getHandler(String mechanism)
+ private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
{
- return _callbackHandlerMap.get(mechanism);
+ try
+ {
+ return (PrincipalDatabase) Class.forName(pdClazz).newInstance();
+ }
+ catch (InstantiationException ie)
+ {
+ throw new ConfigurationException("Cannot instantiate " + pdClazz, ie);
+ }
+ catch (IllegalAccessException iae)
+ {
+ throw new ConfigurationException("Cannot access " + pdClazz, iae);
+ }
+ catch (ClassNotFoundException cnfe)
+ {
+ throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe);
+ }
+ catch (ClassCastException cce)
+ {
+ throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce);
+ }
+ }
+
+ private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config)
+ throws ConfigurationException
+ {
+
+ final Map<String,String> attributes = config.getPdClassAttributeMap();
+
+ for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();)
+ {
+ final Entry<String, String> nameValuePair = iterator.next();
+ final String methodName = generateSetterName(nameValuePair.getKey());
+ final Method method;
+ try
+ {
+ method = principalDatabase.getClass().getMethod(methodName, String.class);
+ }
+ catch (Exception e)
+ {
+ throw new ConfigurationException("No method " + methodName + " found in class "
+ + principalDatabase.getClass()
+ + " hence unable to configure principal database. The method must be public and "
+ + "have a single String argument with a void return type", e);
+ }
+ try
+ {
+ method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue()));
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ConfigurationException(e.getMessage(), e);
+ }
+ catch (PropertyException e)
+ {
+ throw new ConfigurationException(e.getMessage(), e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new ConfigurationException(e.getMessage(), e);
+ }
+ catch (InvocationTargetException e)
+ {
+ // QPID-1347.. InvocationTargetException wraps the checked exception thrown from the reflective
+ // method call. Pull out the underlying message and cause to make these more apparent to the user.
+ throw new ConfigurationException(e.getCause().getMessage(), e.getCause());
+ }
+ }
+ }
+
+ private String generateSetterName(String argName) throws ConfigurationException
+ {
+ if ((argName == null) || (argName.length() == 0))
+ {
+ throw new ConfigurationException("Argument names must have length >= 1 character");
+ }
+
+ if (Character.isLowerCase(argName.charAt(0)))
+ {
+ argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
+ }
+
+ final String methodName = "set" + argName;
+ return methodName;
+ }
+
+ protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase)
+ {
+ _principalDatabase = principalDatabase;
+ }
+
+ protected void registerManagement()
+ {
+ try
+ {
+ _logger.info("Registering UserManagementMBean");
+
+ _mbean = new AMQUserManagementMBean();
+ _mbean.setPrincipalDatabase(_principalDatabase);
+ _mbean.register();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("User management disabled as unable to create MBean:", e);
+ _mbean = null;
+ }
+ }
+
+ protected void unregisterManagement()
+ {
+ try
+ {
+ if (_mbean != null)
+ {
+ _logger.info("Unregistering UserManagementMBean");
+ _mbean.unregister();
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Failed to unregister User management MBean:", e);
+ }
+ finally
+ {
+ _mbean = null;
+ }
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java Sun Aug 14 20:08:12 2011
@@ -45,9 +45,10 @@ public class AmqPlainSaslServerFactory i
public String[] getMechanismNames(Map props)
{
- if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE)))
+ if (props != null &&
+ (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE)))
{
// returned array must be non null according to interface documentation
return new String[0];
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Sun Aug 14 20:08:12 2011
@@ -47,10 +47,11 @@ public class AnonymousSaslServerFactory
public String[] getMechanismNames(Map props)
{
- if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE) ||
- props.containsKey(Sasl.POLICY_NOANONYMOUS)))
+ if (props != null &&
+ (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE) ||
+ props.containsKey(Sasl.POLICY_NOANONYMOUS)))
{
// returned array must be non null according to interface documentation
return new String[0];
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java?rev=1157632&r1=1157631&r2=1157632&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java Sun Aug 14 20:08:12 2011
@@ -45,9 +45,10 @@ public class PlainSaslServerFactory impl
public String[] getMechanismNames(Map props)
{
- if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE)))
+ if (props != null &&
+ (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE)))
{
// returned array must be non null according to interface documentation
return new String[0];
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org